diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index d43a89a..a7264ab 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -26,6 +26,8 @@ public interface AdminClient extends BasicClient { Map getMapping(String index, String type) throws IOException; + void checkMapping(String index); + /** * Delete an index. * @param indexDefinition the index definition diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index e7ffc59..9532af5 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -13,12 +13,6 @@ import java.util.concurrent.TimeUnit; public interface BulkClient extends BasicClient, Flushable { - /** - * Get bulk metric. - * @return the bulk metric - */ - BulkMetric getBulkMetric(); - /** * Get buulk control. * @return the bulk control diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java index ec375eb..ae3e274 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java @@ -36,4 +36,5 @@ public interface BulkController extends Closeable, Flushable { void stopBulkMode(IndexDefinition indexDefinition) throws IOException; void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException; + } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index af825e5..b7d11e0 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -10,6 +10,8 @@ public interface BulkMetric extends Closeable { void init(Settings settings); + void markTotalIngest(long n); + Metered getTotalIngest(); Count getTotalIngestSizeInBytes(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index 73cc462..833c31f 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -8,8 +8,7 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { - @SuppressWarnings("rawtypes") - BulkProcessor add(ActionRequest request); + BulkProcessor add(ActionRequest request); boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java index cd4c283..cef89b8 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java @@ -4,7 +4,7 @@ import java.util.Collection; public interface IndexPruneResult { - enum State { NOTHING_TO_DO, SUCCESS, NONE, FAIL }; + enum State { NOTHING_TO_DO, SUCCESS, NONE }; State getState(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java index 3a04949..4932bcf 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java @@ -14,6 +14,8 @@ import java.util.stream.Stream; public interface SearchClient extends BasicClient { + SearchMetric getSearchMetric(); + Optional get(Consumer getRequestBuilder); Optional multiGet(Consumer multiGetRequestBuilder); diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java new file mode 100644 index 0000000..e0cdb96 --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java @@ -0,0 +1,29 @@ +package org.xbib.elx.api; + +import org.elasticsearch.common.settings.Settings; +import org.xbib.metrics.api.Count; +import org.xbib.metrics.api.Metered; +import java.io.Closeable; + +public interface SearchMetric extends Closeable { + + void init(Settings settings); + + void markTotalQueries(long n); + + Metered getTotalQueries(); + + Count getCurrentQueries(); + + Count getQueries(); + + Count getSucceededQueries(); + + Count getEmptyQueries(); + + long elapsed(); + + void start(); + + void stop(); +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index b2ea2ac..4d60cde 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetaData; @@ -365,7 +366,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); - logger.info("{} indices", getIndexResponse.getIndices().length); + logger.info("found {} indices", getIndexResponse.getIndices().length); List candidateIndices = new ArrayList<>(); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); @@ -459,7 +460,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException { - boolean isEnabled = settings.getAsBoolean("enabled", !(client instanceof MockAdminClient)); + boolean isEnabled = settings.getAsBoolean("enabled", false); String indexName = settings.get("name", index); String fullIndexName; String dateTimePattern = settings.get("dateTimePattern"); @@ -504,6 +505,22 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); } + @Override + public void checkMapping(String index) { + ensureClientIsPresent(); + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); + GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); + ImmutableOpenMap> map = getMappingsResponse.getMappings(); + map.keys().forEach((Consumer>) stringObjectCursor -> { + ImmutableOpenMap mappings = map.get(stringObjectCursor.value); + for (ObjectObjectCursor cursor : mappings) { + String mappingName = cursor.key; + MappingMetaData mappingMetaData = cursor.value; + checkMapping(index, mappingName, mappingMetaData); + } + }); + } + private static String findSettingsFrom(String string) throws IOException { if (string == null) { return null; @@ -563,32 +580,14 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return result; } - public void checkMapping(String index) { - ensureClientIsPresent(); - GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); - GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); - ImmutableOpenMap> map = getMappingsResponse.getMappings(); - map.keys().forEach((Consumer>) stringObjectCursor -> { - ImmutableOpenMap mappings = map.get(stringObjectCursor.value); - for (ObjectObjectCursor cursor : mappings) { - String mappingName = cursor.key; - MappingMetaData mappingMetaData = cursor.value; - checkMapping(index, mappingName, mappingMetaData); - } - }); - } - private void checkMapping(String index, String type, MappingMetaData mappingMetaData) { try { - SearchSourceBuilder builder = new SearchSourceBuilder(); - builder.query(QueryBuilders.matchAllQuery()); - builder.size(0); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(index); - searchRequest.types(type); - searchRequest.source(builder); - SearchResponse searchResponse = - client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); + SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setIndices(index) + .setTypes(type) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(0); + SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); long total = searchResponse.getHits().getTotalHits(); if (total > 0L) { Map fields = new TreeMap<>(); @@ -644,15 +643,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } else if ("type".equals(key)) { QueryBuilder filterBuilder = QueryBuilders.existsQuery(path); QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(filterBuilder); - SearchSourceBuilder builder = new SearchSourceBuilder(); - builder.query(queryBuilder); - builder.size(0); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(index); - searchRequest.types(type); - searchRequest.source(builder); - SearchResponse searchResponse = - client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); + SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setIndices(index) + .setTypes(type) + .setQuery(queryBuilder) + .setSize(0); + SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); fields.put(path, searchResponse.getHits().getTotalHits()); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 0187b95..0a6127e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -50,7 +50,7 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public void setClient(ElasticsearchClient client) { - logger.log(Level.INFO, "setting client = " + client); + logger.log(Level.DEBUG, "setting client = " + client); this.client = client; } @@ -65,8 +65,6 @@ public abstract class AbstractBasicClient implements BasicClient { logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); this.settings = settings; setClient(createClient(settings)); - } else { - logger.log(Level.WARN, "not initializing"); } } @@ -95,23 +93,24 @@ public abstract class AbstractBasicClient implements BasicClient { ensureClientIsPresent(); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() + .timeout(timeout) + .waitForStatus(status); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name(); - if (logger.isErrorEnabled()) { - logger.error(message); - } + logger.error(message); throw new IllegalStateException(message); } } + @Override public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); logger.info("waiting for cluster shard settling"); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() - //.waitForActiveShards(0) .waitForRelocatingShards(0) .timeout(timeout); ClusterHealthResponse healthResponse = @@ -194,9 +193,6 @@ public abstract class AbstractBasicClient implements BasicClient { } protected void ensureClientIsPresent() { - if (this instanceof MockAdminClient) { - return; - } if (client == null) { throw new IllegalStateException("no client"); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index d2adf36..073faf3 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -4,7 +4,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushAction; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -17,12 +17,9 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkController; -import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -34,8 +31,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName()); - private BulkMetric bulkMetric; - private BulkController bulkController; private final AtomicBoolean closed = new AtomicBoolean(true); @@ -44,21 +39,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements public void init(Settings settings) throws IOException { if (closed.compareAndSet(true, false)) { super.init(settings); - logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); - bulkMetric = new DefaultBulkMetric(); - bulkMetric.init(settings); - bulkController = new DefaultBulkController(this, bulkMetric); + bulkController = new DefaultBulkController(this); + logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(',')); bulkController.init(settings); - } else { - logger.log(Level.WARN, "not initializing"); } } - @Override - public BulkMetric getBulkMetric() { - return bulkMetric; - } - @Override public BulkController getBulkController() { return bulkController; @@ -75,10 +61,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements public void close() throws IOException { if (closed.compareAndSet(false, true)) { ensureClientIsPresent(); - if (bulkMetric != null) { - logger.info("closing bulk metric"); - bulkMetric.close(); - } if (bulkController != null) { logger.info("closing bulk controller"); bulkController.close(); @@ -98,40 +80,50 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void newIndex(String index) throws IOException { - newIndex(index, Settings.EMPTY, (Map) null); + newIndex(index, Settings.EMPTY, (XContentBuilder) null); } @Override public void newIndex(String index, Settings settings) throws IOException { - newIndex(index, settings, (Map) null); - } - - @Override - public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException { - String mappingString = builder.string(); - Map mappings = JsonXContent.jsonXContent.createParser(mappingString).mapOrdered(); - newIndex(index, settings, mappings); + newIndex(index, settings, (XContentBuilder) null); } @Override public void newIndex(String index, Settings settings, Map mapping) throws IOException { + if (mapping == null || mapping.isEmpty()) { + newIndex(index, settings, (XContentBuilder) null); + } else { + newIndex(index, settings, JsonXContent.contentBuilder().map(mapping)); + } + } + + @Override + public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException { if (index == null) { - logger.warn("no index name given to create index"); + logger.warn("unable to create index, no index name given"); return; } ensureClientIsPresent(); - waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); - CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index); + CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) + .setIndex(index); if (settings != null) { - createIndexRequest.settings(settings); + createIndexRequestBuilder.setSettings(settings); } - if (mapping != null) { - createIndexRequest.mapping(TYPE_NAME, mapping); + if (builder != null) { + createIndexRequestBuilder.addMapping(TYPE_NAME, builder); + logger.debug("adding mapping = {}", builder.string()); + } else { + // empty mapping + createIndexRequestBuilder.addMapping(TYPE_NAME, + JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject()); + logger.debug("empty mapping"); + } + CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); + if (createIndexResponse.isAcknowledged()) { + logger.info("index {} created", index); + } else { + logger.warn("index creation of {} not acknowledged", index); } - CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); - XContentBuilder builder = XContentFactory.jsonBuilder(); - logger.info("index {} created: {}", index, - createIndexResponse.toString()); } @Override @@ -142,36 +134,40 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException { - if (bulkController != null) { - ensureClientIsPresent(); - bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); - } + ensureClientIsPresent(); + bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); } @Override public void stopBulk(IndexDefinition indexDefinition) throws IOException { - if (bulkController != null) { - ensureClientIsPresent(); - bulkController.stopBulkMode(indexDefinition); - } + ensureClientIsPresent(); + bulkController.stopBulkMode(indexDefinition); } @Override public void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException { - if (bulkController != null) { - ensureClientIsPresent(); - bulkController.stopBulkMode(index, timeout, timeUnit); - } + ensureClientIsPresent(); + bulkController.stopBulkMode(index, timeout, timeUnit); } @Override public BulkClient index(String index, String id, boolean create, String source) { - return index(index, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); + return index(new IndexRequest() + .index(index) + .type(TYPE_NAME) + .id(id) + .create(create) + .source(source)); // will be converted into a bytes reference } @Override public BulkClient index(String index, String id, boolean create, BytesReference source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source)); + return index(new IndexRequest() + .index(index) + .type(TYPE_NAME) + .id(id) + .create(create) + .source(source)); } @Override @@ -183,7 +179,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient delete(String index, String id) { - return delete(new DeleteRequest(index, TYPE_NAME, id)); + return delete(new DeleteRequest() + .index(index) + .type(TYPE_NAME) + .id(id)); } @Override @@ -194,15 +193,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public BulkClient update(String index, String id, BytesReference source) { - return update(new UpdateRequest(index, TYPE_NAME, id) - .doc(source, XContentType.JSON)); + public BulkClient update(String index, String id, String source) { + return update(index, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); } @Override - public BulkClient update(String index, String id, String source) { - return update(new UpdateRequest(index, TYPE_NAME, id) - .doc(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON)); + public BulkClient update(String index, String id, BytesReference source) { + return update(new UpdateRequest() + .index(index) + .type(TYPE_NAME) + .id(id) + .doc(source.hasArray() ? source.array() : source.toBytes())); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 4dfd3b7..0be1268 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -1,6 +1,7 @@ package org.xbib.elx.common; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; @@ -15,9 +16,12 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.xbib.elx.api.SearchClient; +import org.xbib.elx.api.SearchMetric; +import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.Optional; @@ -30,11 +34,43 @@ import java.util.stream.StreamSupport; public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient { + private SearchMetric searchMetric; + + @Override + public SearchMetric getSearchMetric() { + return searchMetric; + } + + @Override + public void init(Settings settings) throws IOException { + super.init(settings); + this.searchMetric = new DefaultSearchMetric(); + searchMetric.init(settings); + } + + @Override + public void close() throws IOException { + super.close(); + if (searchMetric != null) { + searchMetric.close(); + } + } + @Override public Optional get(Consumer getRequestBuilderConsumer) { GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE); getRequestBuilderConsumer.accept(getRequestBuilder); - GetResponse getResponse = getRequestBuilder.execute().actionGet(); + ActionFuture actionFuture = getRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + GetResponse getResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + if (getResponse.isExists()) { + searchMetric.getSucceededQueries().inc(); + } else { + searchMetric.getEmptyQueries().inc(); + } return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty(); } @@ -42,23 +78,46 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement public Optional multiGet(Consumer multiGetRequestBuilderConsumer) { MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE); multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder); - MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.execute().actionGet(); - return multiGetItemResponse.getResponses().length == 0 ? Optional.empty() : Optional.of(multiGetItemResponse); + ActionFuture actionFuture = multiGetRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + MultiGetResponse multiGetItemResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + boolean isempty = multiGetItemResponse.getResponses().length == 0; + if (isempty) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } + return isempty ? Optional.empty() : Optional.of(multiGetItemResponse); } @Override public Optional search(Consumer queryBuilder) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); - SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); + ActionFuture actionFuture = searchRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + SearchResponse searchResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); if (searchResponse.getFailedShards() > 0) { StringBuilder sb = new StringBuilder("Search failed:"); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { sb.append("\n").append(failure.reason()); } + searchMetric.getEmptyQueries().inc(); throw new ElasticsearchException(sb.toString()); } - return searchResponse.getHits().getHits().length == 0 ? Optional.empty() : Optional.of(searchResponse); + boolean isempty = searchResponse.getHits().getHits().length == 0; + if (isempty) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } + return isempty ? Optional.empty() : Optional.of(searchResponse); } @Override @@ -69,10 +128,25 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); SearchResponse originalSearchResponse = searchRequestBuilder.execute().actionGet(); Stream infiniteResponses = Stream.iterate(originalSearchResponse, - searchResponse -> new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) - .setScrollId(searchResponse.getScrollId()) - .setScroll(scrollTime) - .execute().actionGet()); + searchResponse -> { + SearchScrollRequestBuilder searchScrollRequestBuilder = + new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) + .setScrollId(searchResponse.getScrollId()) + .setScroll(scrollTime); + ActionFuture actionFuture1 = searchScrollRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + SearchResponse searchResponse1 = actionFuture1.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + boolean isempty = searchResponse1.getHits().getHits().length == 0; + if (isempty) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } + return searchResponse1; + }); Predicate condition = searchResponse -> searchResponse.getHits().getHits().length > 0; Consumer lastAction = searchResponse -> { ClearScrollRequestBuilder clearScrollRequestBuilder = diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index d2be48a..131e1d4 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -27,10 +27,12 @@ public class DefaultBulkController implements BulkController { private static final Logger logger = LogManager.getLogger(DefaultBulkController.class); - private final BulkClient client; + private final BulkClient bulkClient; private final BulkMetric bulkMetric; + private BulkProcessor bulkProcessor; + private final List indexNames; private final Map startBulkRefreshIntervals; @@ -41,15 +43,11 @@ public class DefaultBulkController implements BulkController { private final TimeUnit maxWaitTimeUnit; - private BulkProcessor bulkProcessor; - - private BulkListener bulkListener; - private final AtomicBoolean active; - public DefaultBulkController(BulkClient client, BulkMetric bulkMetric) { - this.client = client; - this.bulkMetric = bulkMetric; + public DefaultBulkController(BulkClient bulkClient) { + this.bulkClient = bulkClient; + this.bulkMetric = new DefaultBulkMetric(); this.indexNames = new ArrayList<>(); this.active = new AtomicBoolean(false); this.startBulkRefreshIntervals = new HashMap<>(); @@ -65,11 +63,12 @@ public class DefaultBulkController implements BulkController { @Override public Throwable getLastBulkError() { - return bulkListener.getLastBulkError(); + return bulkProcessor.getBulkListener().getLastBulkError(); } @Override public void init(Settings settings) { + bulkMetric.init(settings); int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), @@ -81,8 +80,8 @@ public class DefaultBulkController implements BulkController { "maxVolumePerRequest")); boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), Parameters.ENABLE_BULK_LOGGING.getValue()); - this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); - this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener) + BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); + this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) .setFlushInterval(flushIngestInterval) @@ -117,7 +116,7 @@ public class DefaultBulkController implements BulkController { startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds); stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds); if (startRefreshIntervalInSeconds != 0L) { - client.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s", + bulkClient.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s", 30L, TimeUnit.SECONDS); } } @@ -189,7 +188,7 @@ public class DefaultBulkController implements BulkController { if (indexNames.contains(index)) { Long secs = stopBulkRefreshIntervals.get(index); if (secs != null && secs != 0L) { - client.updateIndexSetting(index, "refresh_interval", secs + "s", + bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s", 30L, TimeUnit.SECONDS); } indexNames.remove(index); @@ -207,11 +206,11 @@ public class DefaultBulkController implements BulkController { @Override public void close() throws IOException { flush(); - if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) { + if (bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit)) { for (String index : indexNames) { Long secs = stopBulkRefreshIntervals.get(index); if (secs != null && secs != 0L) - client.updateIndexSetting(index, "refresh_interval", secs + "s", + bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s", 30L, TimeUnit.SECONDS); } indexNames.clear(); @@ -228,9 +227,5 @@ public class DefaultBulkController implements BulkController { if (bulkProcessor == null) { throw new UnsupportedOperationException("bulk processor not present"); } - if (bulkListener == null) { - throw new UnsupportedOperationException("bulk listener not present"); - } } - } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 49a54bf..db0ea42 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -11,7 +11,7 @@ import org.xbib.elx.api.BulkMetric; public class DefaultBulkListener implements BulkListener { - private final Logger logger = LogManager.getLogger(BulkListener.class.getName()); + private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName()); private final BulkController bulkController; @@ -31,15 +31,12 @@ public class DefaultBulkListener implements BulkListener { @Override public void beforeBulk(long executionId, BulkRequest request) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().inc(); - int n = request.numberOfActions(); - bulkMetric.getSubmitted().inc(n); - bulkMetric.getCurrentIngestNumDocs().inc(n); - bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); - } + long l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().inc(); + int n = request.numberOfActions(); + bulkMetric.getSubmitted().inc(n); + bulkMetric.getCurrentIngestNumDocs().inc(n); + bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); if (isBulkLoggingEnabled && logger.isDebugEnabled()) { logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", executionId, @@ -51,26 +48,19 @@ public class DefaultBulkListener implements BulkListener { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().dec(); - bulkMetric.getSucceeded().inc(response.getItems().length); - } + long l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().dec(); + bulkMetric.getSucceeded().inc(response.getItems().length); int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); - } + bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); if (itemResponse.isFailed()) { n++; - if (bulkMetric != null) { - bulkMetric.getSucceeded().dec(1); - bulkMetric.getFailed().inc(1); - } + bulkMetric.getSucceeded().dec(1); + bulkMetric.getFailed().inc(1); } } - if (isBulkLoggingEnabled && bulkMetric != null && logger.isDebugEnabled()) { + if (isBulkLoggingEnabled && logger.isDebugEnabled()) { logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", executionId, bulkMetric.getSucceeded().getCount(), @@ -84,17 +74,13 @@ public class DefaultBulkListener implements BulkListener { executionId, n, response.buildFailureMessage()); } } else { - if (bulkMetric != null) { - bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); - } + bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(); - } + bulkMetric.getCurrentIngest().dec(); lastBulkError = failure; if (logger.isErrorEnabled()) { logger.error("after bulk [" + executionId + "] error", failure); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 8127e29..fc3c14b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -44,6 +44,12 @@ public class DefaultBulkMetric implements BulkMetric { start(); } + + @Override + public void markTotalIngest(long n) { + totalIngest.mark(n); + } + @Override public Metered getTotalIngest() { return totalIngest; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 1de6169..a0124ec 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -24,8 +24,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** - * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request - * (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk + * A bulk processor is a thread safe bulk processing class, allowing to easily + * set when to "flush" a new bulk request + * (either based on number of actions, based on the size, or time), and + * to easily control the number of concurrent bulk * requests allowed to be executed in parallel. * In order to create a new bulk processor, use the {@link Builder}. */ @@ -78,11 +80,9 @@ public class DefaultBulkProcessor implements BulkProcessor { } } - public static Builder builder(ElasticsearchClient client, - BulkListener listener) { - Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null"); - Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null"); - return new Builder(client, listener); + public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) { + Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null"); + return new Builder(client, bulkListener); } @Override @@ -149,9 +149,8 @@ public class DefaultBulkProcessor implements BulkProcessor { * @param request request * @return his bulk processor */ - @SuppressWarnings("rawtypes") @Override - public DefaultBulkProcessor add(ActionRequest request) { + public DefaultBulkProcessor add(ActionRequest request) { internalAdd(request); return this; } @@ -335,24 +334,25 @@ public class DefaultBulkProcessor implements BulkProcessor { private final ElasticsearchClient client; - private final BulkListener listener; + private final BulkListener bulkListener; - SyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener) { + SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) { + Objects.requireNonNull(bulkListener, "A listener is required for SyncBulkRequestHandler but null"); this.client = client; - this.listener = listener; + this.bulkListener = bulkListener; } @Override public void execute(BulkRequest bulkRequest, long executionId) { boolean afterCalled = false; try { - listener.beforeBulk(executionId, bulkRequest); + bulkListener.beforeBulk(executionId, bulkRequest); BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet(); afterCalled = true; - listener.afterBulk(executionId, bulkRequest, bulkResponse); + bulkListener.afterBulk(executionId, bulkRequest, bulkResponse); } catch (Exception e) { if (!afterCalled) { - listener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, bulkRequest, e); } } } @@ -367,15 +367,16 @@ public class DefaultBulkProcessor implements BulkProcessor { private final ElasticsearchClient client; - private final BulkListener listener; + private final BulkListener bulkListener; private final Semaphore semaphore; private final int concurrentRequests; - private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener, int concurrentRequests) { + private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) { + Objects.requireNonNull(bulkListener, "A listener is required for AsyncBulkRequestHandler but null"); this.client = client; - this.listener = listener; + this.bulkListener = bulkListener; this.concurrentRequests = concurrentRequests; this.semaphore = new Semaphore(concurrentRequests); } @@ -385,14 +386,14 @@ public class DefaultBulkProcessor implements BulkProcessor { boolean bulkRequestSetupSuccessful = false; boolean acquired = false; try { - listener.beforeBulk(executionId, bulkRequest); + bulkListener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); acquired = true; client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { @Override public void onResponse(BulkResponse response) { try { - listener.afterBulk(executionId, bulkRequest, response); + bulkListener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); } @@ -401,7 +402,7 @@ public class DefaultBulkProcessor implements BulkProcessor { @Override public void onFailure(Throwable e) { try { - listener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); } @@ -410,9 +411,9 @@ public class DefaultBulkProcessor implements BulkProcessor { bulkRequestSetupSuccessful = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - listener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { - listener.afterBulk(executionId, bulkRequest, e); + bulkListener.afterBulk(executionId, bulkRequest, e); } finally { if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java new file mode 100644 index 0000000..5f164b3 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -0,0 +1,97 @@ +package org.xbib.elx.common; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.Settings; +import org.xbib.elx.api.SearchMetric; +import org.xbib.metrics.api.Count; +import org.xbib.metrics.api.Metered; +import org.xbib.metrics.common.CountMetric; +import org.xbib.metrics.common.Meter; +import java.util.concurrent.Executors; + +public class DefaultSearchMetric implements SearchMetric { + + private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName()); + + private final Meter totalQuery; + + private final Count currentQuery; + + private final Count queries; + + private final Count succeededQueries; + + private final Count emptyQueries; + + private Long started; + + private Long stopped; + + public DefaultSearchMetric() { + totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor()); + currentQuery = new CountMetric(); + queries = new CountMetric(); + succeededQueries = new CountMetric(); + emptyQueries = new CountMetric(); + } + + @Override + public void init(Settings settings) { + logger.info("init"); + start(); + } + + @Override + public void markTotalQueries(long n) { + totalQuery.mark(n); + } + + @Override + public Metered getTotalQueries() { + return totalQuery; + } + + @Override + public Count getCurrentQueries() { + return currentQuery; + } + + @Override + public Count getQueries() { + return queries; + } + + @Override + public Count getSucceededQueries() { + return succeededQueries; + } + + @Override + public Count getEmptyQueries() { + return emptyQueries; + } + + @Override + public long elapsed() { + return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L; + } + + @Override + public void start() { + this.started = System.nanoTime(); + totalQuery.start(5L); + } + + @Override + public void stop() { + this.stopped = System.nanoTime(); + totalQuery.stop(); + } + + @Override + public void close() { + stop(); + totalQuery.shutdown(); + } +} diff --git a/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.BulkClientProvider b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.BulkClientProvider new file mode 100644 index 0000000..23d86e7 --- /dev/null +++ b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.BulkClientProvider @@ -0,0 +1 @@ +org.xbib.elx.common.MockBulkClientProvider \ No newline at end of file diff --git a/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.SearchClientProvider b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.SearchClientProvider new file mode 100644 index 0000000..ec8036b --- /dev/null +++ b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.SearchClientProvider @@ -0,0 +1 @@ +org.xbib.elx.common.MockSearchClientProvider \ No newline at end of file diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/MockAdminClientProviderTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/MockAdminClientProviderTest.java deleted file mode 100644 index c7123e8..0000000 --- a/elx-common/src/test/java/org/xbib/elx/common/test/MockAdminClientProviderTest.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.xbib.elx.common.test; - -import org.junit.jupiter.api.Test; -import org.xbib.elx.common.ClientBuilder; -import org.xbib.elx.common.MockAdminClient; -import org.xbib.elx.common.MockAdminClientProvider; - -import java.io.IOException; - -import static org.junit.jupiter.api.Assertions.assertNotNull; - -class MockAdminClientProviderTest { - - @Test - void testMockAdminProvider() throws IOException { - MockAdminClient client = ClientBuilder.builder() - .setAdminClientProvider(MockAdminClientProvider.class) - .build(); - assertNotNull(client); - } -} diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/MockClientProviderTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/MockClientProviderTest.java new file mode 100644 index 0000000..51fe18a --- /dev/null +++ b/elx-common/src/test/java/org/xbib/elx/common/test/MockClientProviderTest.java @@ -0,0 +1,41 @@ +package org.xbib.elx.common.test; + +import org.junit.jupiter.api.Test; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.MockAdminClient; +import org.xbib.elx.common.MockAdminClientProvider; +import org.xbib.elx.common.MockBulkClient; +import org.xbib.elx.common.MockBulkClientProvider; +import org.xbib.elx.common.MockSearchClient; +import org.xbib.elx.common.MockSearchClientProvider; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class MockClientProviderTest { + + @Test + void testMockAdminClientProvider() throws IOException { + MockAdminClient client = ClientBuilder.builder() + .setAdminClientProvider(MockAdminClientProvider.class) + .build(); + assertNotNull(client); + } + + @Test + void testMockBulkClientProvider() throws IOException { + MockBulkClient client = ClientBuilder.builder() + .setBulkClientProvider(MockBulkClientProvider.class) + .build(); + assertNotNull(client); + } + + @Test + void testMockSearchClientProvider() throws IOException { + MockSearchClient client = ClientBuilder.builder() + .setSearchClientProvider(MockSearchClientProvider.class) + .build(); + assertNotNull(client); + } +} diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java index 2bc1f44..95ab468 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java @@ -6,13 +6,12 @@ import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -54,16 +53,14 @@ class SearchTest { client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet(); for (int i = 0; i < 1; i++) { QueryBuilder queryStringBuilder = QueryBuilders.queryStringQuery("rs:" + 1234); - SearchSourceBuilder searchSource = new SearchSourceBuilder(); - searchSource.query(queryStringBuilder); - searchSource.sort("rowcount", SortOrder.DESC); - searchSource.from(i * 10); - searchSource.size(10); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices("pages"); - searchRequest.types("row"); - searchRequest.source(searchSource); - SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); + SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setIndices("pages") + .setTypes("row") + .setQuery(queryStringBuilder) + .addSort("rowcount", SortOrder.DESC) + .setFrom(i * 10) + .setSize(10); + SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); assertTrue(searchResponse.getHits().getTotalHits() > 0); } } diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java index cb56250..c1bdb9a 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java @@ -1,7 +1,5 @@ package org.xbib.elx.common.test; -import static org.junit.jupiter.api.Assertions.assertEquals; - import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; @@ -19,6 +17,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import static org.junit.jupiter.api.Assertions.assertEquals; @ExtendWith(TestExtension.class) class SimpleTest { diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java index 139f560..2b562f7 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java @@ -46,7 +46,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); - private static final String key = "es-instance"; + private static final String key = "es-instance-"; private static final AtomicInteger count = new AtomicInteger(0); @@ -73,17 +73,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode("1"); - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); - Object obj = response.iterator().next().getTransport().getAddress() - .publishAddress(); - String host = null; - int port = 0; - if (obj instanceof InetSocketTransportAddress) { - InetSocketTransportAddress address = (InetSocketTransportAddress) obj; - host = address.address().getHostName(); - port = address.address().getPort(); - } try { ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) @@ -99,7 +88,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft ClusterStateResponse clusterStateResponse = helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); - logger.info("host = {} port = {}", host, port); } @Override @@ -107,12 +95,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Helper helper = extensionContext.getParent().get().getStore(ns) .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); closeNodes(helper); - deleteFiles(Paths.get(helper.getHome() + "/data")); - logger.info("data files wiped"); + deleteFiles(Paths.get(helper.getHome())); + logger.info("files wiped"); Thread.sleep(2000L); // let OS commit changes } - private void closeNodes(Helper helper) throws IOException { + private void closeNodes(Helper helper) { logger.info("closing all clients"); for (AbstractClient client : helper.clients.values()) { client.close(); @@ -128,7 +116,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static void deleteFiles(Path directory) throws IOException { if (Files.exists(directory)) { - Files.walkFileTree(directory, new SimpleFileVisitor() { + Files.walkFileTree(directory, new SimpleFileVisitor<>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); @@ -146,7 +134,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private Helper create() { Helper helper = new Helper(); - helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); + String home = System.getProperty("path.home", "build/elxnode"); + helper.setHome(home + "/" + helper.randomString(8)); helper.setClusterName("test-cluster-" + helper.randomString(8)); logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); return helper; @@ -158,6 +147,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft String cluster; + String host; + + int port; + Map nodes = new HashMap<>(); Map clients = new HashMap<>(); @@ -187,6 +180,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft void startNode(String id) { buildNode(id).start(); + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); + NodesInfoResponse response = client(id). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + Object obj = response.iterator().next().getTransport().getAddress() + .publishAddress(); + if (obj instanceof InetSocketTransportAddress) { + InetSocketTransportAddress address = (InetSocketTransportAddress) obj; + host = address.address().getHostName(); + port = address.address().getPort(); + } } ElasticsearchClient client(String id) { diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java index 39beedb..4fa10e0 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java @@ -5,12 +5,11 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -52,13 +51,11 @@ class WildcardTest { } private long count(ElasticsearchClient client, QueryBuilder queryBuilder) { - SearchSourceBuilder builder = new SearchSourceBuilder() - .query(queryBuilder); - SearchRequest searchRequest = new SearchRequest() - .indices("index") - .types("type") - .source(builder); - return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits(); + SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setIndices("index") + .setTypes("type") + .setQuery(queryBuilder); + return searchRequestBuilder.execute().actionGet().getHits().getTotalHits(); } private void validateCount(ElasticsearchClient client, QueryBuilder queryBuilder, long expectedHits) { diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java b/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java index ab5f171..9d006c1 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java @@ -1 +1,4 @@ +/** + * + */ package org.xbib.elx.common.test; diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java index 97a5d24..ed6937f 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java @@ -3,6 +3,7 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractAdminClient; +import java.io.IOException; public class NodeAdminClient extends AbstractAdminClient { @@ -13,12 +14,12 @@ public class NodeAdminClient extends AbstractAdminClient { } @Override - public ElasticsearchClient createClient(Settings settings) { + protected ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } @Override - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { helper.closeClient(settings); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index 019fab1..d493976 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -3,6 +3,7 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractBulkClient; +import java.io.IOException; public class NodeBulkClient extends AbstractBulkClient { @@ -13,12 +14,12 @@ public class NodeBulkClient extends AbstractBulkClient { } @Override - public ElasticsearchClient createClient(Settings settings) { + protected ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } @Override - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { helper.closeClient(settings); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index 411d311..6e429f7 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -8,6 +8,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; +import org.xbib.elx.common.Parameters; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -34,7 +36,7 @@ public class NodeClientHelper { key -> innerCreateClient(settings)); } - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); if (client != null) { logger.debug("closing node..."); @@ -49,20 +51,38 @@ public class NodeClientHelper { + " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.runtime.version") + " " + System.getProperty("java.vm.version"); - Settings effectiveSettings = Settings.builder().put(settings) + Settings effectiveSettings = Settings.builder() + .put(filterSettings(settings)) .put("node.client", true) .put("node.master", false) .put("node.data", false) .put("path.home", settings.get("path.home")) .build(); logger.info("creating node client on {} with effective settings {}", - version, effectiveSettings.getAsMap()); + version, effectiveSettings.toDelimitedString(',')); Collection> plugins = Collections.emptyList(); node = new BulkNode(new Environment(effectiveSettings), plugins); node.start(); return node.client(); } + private static Settings filterSettings(Settings settings) { + Settings.Builder builder = Settings.builder(); + for (Map.Entry entry : settings.getAsMap().entrySet()) { + if (!isPrivateSettings(entry.getKey())) { + builder.put(entry.getKey(), entry.getValue()); + } + } + return builder.build(); + } + + private static boolean isPrivateSettings(String key) { + return key.equals(Parameters.MAX_ACTIONS_PER_REQUEST.name()) || + key.equals(Parameters.MAX_CONCURRENT_REQUESTS.name()) || + key.equals(Parameters.MAX_VOLUME_PER_REQUEST.name()) || + key.equals(Parameters.FLUSH_INTERVAL.name()); + } + private static class BulkNode extends Node { BulkNode(Environment env, Collection> classpathPlugins) { diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index 9248ac7..7591c17 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -3,6 +3,7 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractSearchClient; +import java.io.IOException; public class NodeSearchClient extends AbstractSearchClient { @@ -13,12 +14,12 @@ public class NodeSearchClient extends AbstractSearchClient { } @Override - public ElasticsearchClient createClient(Settings settings) { + protected ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } @Override - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { helper.closeClient(settings); } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index ffbe41b..7d81298 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -41,45 +41,42 @@ class BulkClientTest { @Test void testSingleDoc() throws Exception { - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) - .build(); - try { + .build()) { bulkClient.newIndex("test"); bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - } finally { - assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.close(); } } @Test void testNewIndex() throws Exception { - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) - .build(); - bulkClient.newIndex("test"); - bulkClient.close(); + .build()) { + bulkClient.newIndex("test"); + } } @Test void testMapping() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) .setAdminClientProvider(NodeAdminClientProvider.class) .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { @@ -101,13 +98,12 @@ class BulkClientTest { @Test void testRandomDocs() throws Exception { long numactions = ACTIONS; - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .build(); - try { + .build()) { bulkClient.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { bulkClient.index("test", null, false, @@ -115,15 +111,13 @@ class BulkClientTest { } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - } finally { - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); - bulkClient.close(); } } @@ -133,14 +127,13 @@ class BulkClientTest { Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions); - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .build(); - try { + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -171,15 +164,13 @@ class BulkClientTest { logger.warn("latch timeout"); } bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); - assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount()); - } finally { + assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + bulkClient.refreshIndex("test"); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); - bulkClient.close(); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index d694ee5..2856934 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -33,7 +33,7 @@ class DuplicateIDTest { @Test void testDuplicateDocIDs() throws Exception { long numactions = ACTIONS; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) @@ -47,7 +47,7 @@ class DuplicateIDTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 1e8670f..f6e1c2f 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -33,11 +33,11 @@ class IndexPruneTest { @Test void testPrune() throws IOException { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) .setAdminClientProvider(NodeAdminClientProvider.class) .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 0c79ffb..615fe82 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -33,11 +33,11 @@ class IndexShiftTest { @Test void testIndexShift() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) .setAdminClientProvider(NodeAdminClientProvider.class) .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 33dd290..0ca18f9 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -36,13 +36,11 @@ class SearchTest { @Test void testDocStream() throws Exception { - long numactions = ACTIONS; - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .build(); - try (bulkClient) { + .build()) { bulkClient.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { bulkClient.index("test", null, false, @@ -51,14 +49,14 @@ class SearchTest { bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + assertEquals(ACTIONS, bulkClient.getSearchableDocs("test")); + assertEquals(ACTIONS, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); } - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); - } - assertNull(bulkClient.getBulkController().getLastBulkError()); - try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1")) + try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client) .setSearchClientProvider(NodeSearchClientProvider.class) .put(helper.getNodeSettings()) .build()) { @@ -67,7 +65,7 @@ class SearchTest { .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMinutes(1), 10); long count = stream.count(); - assertEquals(numactions, count); + assertEquals(ACTIONS, count); Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); @@ -76,7 +74,10 @@ class SearchTest { logger.info(id); idcount.incrementAndGet(); }); - assertEquals(numactions, idcount.get()); + assertEquals(ACTIONS, idcount.get()); + assertEquals(11, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(9, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index d747566..351e025 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -30,23 +30,28 @@ class SmokeTest { @Test void smokeTest() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) .setAdminClientProvider(NodeAdminClientProvider.class) .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + adminClient.buildIndexDefinitionFromSettings("test_smoke_definition", Settings.EMPTY); assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex("test_smoke"); + logger.info("new index: done"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + logger.info("index doc: done"); bulkClient.flush(); + logger.info("flush: done"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); + logger.info("wait: done"); adminClient.checkMapping("test_smoke"); + logger.info("check mapping: done"); bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); bulkClient.delete("test_smoke", "1"); bulkClient.flush(); @@ -56,6 +61,7 @@ class SmokeTest { bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.deleteIndex("test_smoke"); + logger.info("delete index: done"); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.flush(); @@ -63,13 +69,14 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); adminClient.deleteIndex(indexDefinition); + logger.info("done"); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index 991cb43..8b46b96 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; @@ -33,8 +32,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; -import java.util.HashMap; -import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -42,11 +40,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final Logger logger = LogManager.getLogger("test"); - private static final Random random = new Random(); - - private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); - - private static final String key = "es-instance"; + private static final String key = "es-instance-"; private static final AtomicInteger count = new AtomicInteger(0); @@ -62,73 +56,35 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - // initialize new helper here, increase counter - return extensionContext.getParent().get().getStore(ns) - .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class); + return extensionContext.getParent().isPresent() ? + extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class) : null; } @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - Helper helper = extensionContext.getParent().get().getStore(ns) - .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + Helper helper = extensionContext.getParent().isPresent() ? + extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; + Objects.requireNonNull(helper); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); - helper.startNode("1"); - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); - Object obj = response.iterator().next().getTransport().getAddress() - .publishAddress(); - String host = null; - int port = 0; - if (obj instanceof InetSocketTransportAddress) { - InetSocketTransportAddress address = (InetSocketTransportAddress) obj; - host = address.address().getHostName(); - port = address.address().getPort(); - } - try { - ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) - .timeout(TimeValue.timeValueSeconds(30))).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + ", from here on, everything will fail!"); - } - } catch (ElasticsearchTimeoutException e) { - throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); - } - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); - ClusterStateResponse clusterStateResponse = - helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); - logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); - logger.info("host = {} port = {}", host, port); + helper.startNode(); + helper.greenHealth(); + logger.info("cluser name = {}", helper.clusterName()); } @Override public void afterEach(ExtensionContext extensionContext) throws Exception { - Helper helper = extensionContext.getParent().get().getStore(ns) - .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); - closeNodes(helper); - deleteFiles(Paths.get(helper.getHome() + "/data")); - logger.info("data files wiped"); - Thread.sleep(2000L); // let OS commit changes - } - - private void closeNodes(Helper helper) throws IOException { - logger.info("closing all clients"); - for (AbstractClient client : helper.clients.values()) { - client.close(); - } - logger.info("closing all nodes"); - for (Node node : helper.nodes.values()) { - if (node != null) { - node.close(); - } - } - logger.info("all nodes closed"); + Helper helper = extensionContext.getParent().isPresent() ? + extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; + Objects.requireNonNull(helper); + helper.closeNodes(); + deleteFiles(Paths.get(helper.getHome())); + logger.info("files wiped"); + Thread.sleep(1000L); // let OS commit changes } private static void deleteFiles(Path directory) throws IOException { if (Files.exists(directory)) { - Files.walkFileTree(directory, new SimpleFileVisitor() { + Files.walkFileTree(directory, new SimpleFileVisitor<>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); @@ -146,7 +102,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private Helper create() { Helper helper = new Helper(); - helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); + String home = System.getProperty("path.home", "build/elxnode"); + helper.setHome(home + "/" + helper.randomString(8)); helper.setClusterName("test-cluster-" + helper.randomString(8)); logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); return helper; @@ -158,9 +115,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft String cluster; - Map nodes = new HashMap<>(); + String host; - Map clients = new HashMap<>(); + int port; + + Node node; + + AbstractClient client; void setHome(String home) { this.home = home; @@ -180,18 +141,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Settings getNodeSettings() { return Settings.builder() - .put("name", "elx-client") // for Threadpool name + .put("name", "elx-client") // for threadpool name .put("cluster.name", getClusterName()) .put("path.home", getHome()) .build(); } - void startNode(String id) { - buildNode(id).start(); - } - - ElasticsearchClient client(String id) { - return clients.get(id); + void startNode() { + buildNode().start(); + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); + NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + Object obj = response.iterator().next().getTransport().getAddress().publishAddress(); + if (obj instanceof InetSocketTransportAddress) { + InetSocketTransportAddress address = (InetSocketTransportAddress) obj; + host = address.address().getHostName(); + port = address.address().getPort(); + logger.info("host = {} port = {}", host, port); + } } String randomString(int len) { @@ -203,16 +169,51 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return new String(buf); } - private Node buildNode(String id) { + Node buildNode() { + String id = "1"; Settings nodeSettings = Settings.builder() .put(getNodeSettings()) .put("node.name", id) .build(); - Node node = new MockNode(nodeSettings); - AbstractClient client = (AbstractClient) node.client(); - nodes.put(id, node); - clients.put(id, client); + node = new MockNode(nodeSettings); + client = (AbstractClient) node.client(); return node; } + + void closeNodes() { + if (client != null) { + logger.info("closing client"); + client.close(); + } + if (node != null) { + logger.info("closing all nodes"); + node.close(); + } + } + + void greenHealth() throws IOException { + try { + ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, + new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) + .timeout(TimeValue.timeValueSeconds(30))).actionGet(); + if (healthResponse != null && healthResponse.isTimedOut()) { + throw new IOException("cluster state is " + healthResponse.getStatus().name() + + ", from here on, everything will fail!"); + } + } catch (ElasticsearchTimeoutException e) { + throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); + } + } + + String clusterName() { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); + ClusterStateResponse clusterStateResponse = + client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); + return clusterStateResponse.getClusterName().value(); + } + + private static final Random random = new Random(); + + private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java b/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java new file mode 100644 index 0000000..662c94e --- /dev/null +++ b/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java @@ -0,0 +1,4 @@ +/** + * + */ +package org.xbib.elx.node.test; diff --git a/elx-node/src/test/resources/log4j2-test.xml b/elx-node/src/test/resources/log4j2-test.xml index 6c323f8..11bffcf 100644 --- a/elx-node/src/test/resources/log4j2-test.xml +++ b/elx-node/src/test/resources/log4j2-test.xml @@ -10,4 +10,4 @@ - \ No newline at end of file + diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index f06a5d2..e66c483 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -19,8 +19,8 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - public ElasticsearchClient createClient(Settings settings) { - return helper.createClient(settings, null); + public ElasticsearchClient createClient(Settings settings) throws IOException { + return helper.createClient(settings); } @Override @@ -30,7 +30,7 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { helper.closeClient(settings); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index a10b22b..3f49942 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -18,8 +18,8 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public ElasticsearchClient createClient(Settings settings) { - return helper.createClient(settings, null); + public ElasticsearchClient createClient(Settings settings) throws IOException { + return helper.createClient(settings); } @Override @@ -29,7 +29,7 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { helper.closeClient(settings); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index f9c1573..5fe7c68 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -29,23 +29,16 @@ public class TransportClientHelper { private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName()); - private static Object configurationObject; - private static final Map clientMap = new HashMap<>(); - public ElasticsearchClient createClient(Settings settings, Object object) { - if (configurationObject == null && object != null) { - configurationObject = object; - } - if (configurationObject instanceof ElasticsearchClient) { - return (ElasticsearchClient) configurationObject; - } - return clientMap.computeIfAbsent(settings.get("cluster.name"), - key -> innerCreateClient(settings)); + public ElasticsearchClient createClient(Settings settings) { + String clusterName = settings.get("cluster.name", "elasticsearch"); + return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); } public void closeClient(Settings settings) { - ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); + String clusterName = settings.get("cluster.name", "elasticsearch"); + ElasticsearchClient client = clientMap.remove(clusterName); if (client != null) { if (client instanceof Client) { ((Client) client).close(); @@ -57,8 +50,8 @@ public class TransportClientHelper { public void init(TransportClient transportClient, Settings settings) throws IOException { Collection addrs = findAddresses(settings); if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) { - throw new NoNodeAvailableException("no cluster nodes available, check settings " - + settings.getAsMap()); + throw new NoNodeAvailableException("no cluster nodes available, check settings = " + + settings.toDelimitedString(',')); } } @@ -77,12 +70,13 @@ public class TransportClientHelper { } catch (NumberFormatException e) { logger.warn(e.getMessage(), e); } - } - if (splitHost.length == 1) { + } else if (splitHost.length == 1) { String host = splitHost[0]; InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); addresses.add(address); + } else { + throw new IOException("invalid hostname specification: " + hostname); } } return addresses; @@ -96,47 +90,45 @@ public class TransportClientHelper { logger.info("connected to nodes = {}", nodes); if (nodes != null && !nodes.isEmpty()) { if (autodiscover) { - logger.debug("trying to auto-discover all nodes..."); + logger.debug("trying to discover all nodes..."); ClusterStateRequestBuilder clusterStateRequestBuilder = new ClusterStateRequestBuilder(transportClient, ClusterStateAction.INSTANCE); ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet(); DiscoveryNodes discoveryNodes = clusterStateResponse.getState().getNodes(); - addDiscoveryNodes(transportClient, discoveryNodes); - logger.info("after auto-discovery: connected to {}", transportClient.connectedNodes()); + for (DiscoveryNode discoveryNode : discoveryNodes) { + transportClient.addTransportAddress(discoveryNode.getAddress()); + } + logger.info("after discovery: connected to {}", transportClient.connectedNodes()); } return true; } return false; } - private void addDiscoveryNodes(TransportClient transportClient, DiscoveryNodes discoveryNodes) { - for (DiscoveryNode discoveryNode : discoveryNodes) { - transportClient.addTransportAddress(discoveryNode.getAddress()); - } - } - private ElasticsearchClient innerCreateClient(Settings settings) { String systemIdentifier = System.getProperty("os.name") + " " + System.getProperty("java.vm.name") + " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.vm.version") + " Elasticsearch " + Version.CURRENT.toString(); - Settings effectiveSettings = Settings.builder() - // for thread pool size - .put("processors", - settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) + logger.info("creating transport client on {} with custom settings {}", + systemIdentifier, settings.getAsMap()); + // we need to disable dead lock check because we may have mixed node/transport clients + DefaultChannelFuture.setUseDeadLockChecker(false); + return TransportClient.builder() + .settings(getTransportClientSettings(settings)) + .build(); + } + + private Settings getTransportClientSettings(Settings settings) { + return Settings.builder() + .put("cluster.name", settings.get("cluster.name", "elasticsearch")) + .put("path.home", settings.get("path.home", ".")) + .put("processors", settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) // for thread pool size .put("client.transport.sniff", false) // do not sniff .put("client.transport.nodes_sampler_interval", "1m") // do not ping .put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect .put("client.transport.ignore_cluster_name", true) // connect to any cluster - // custom settings may override defaults - .put(settings) .build(); - logger.info("creating transport client on {} with custom settings {} and effective settings {}", - systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap()); - - // we need to disable dead lock check because we may have mixed node/transport clients - DefaultChannelFuture.setUseDeadLockChecker(false); - return TransportClient.builder().settings(effectiveSettings).build(); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index 85636f0..87d5767 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -19,7 +19,7 @@ public class TransportSearchClient extends AbstractSearchClient { @Override public ElasticsearchClient createClient(Settings settings) throws IOException { - return helper.createClient(settings, null); + return helper.createClient(settings); } @Override @@ -29,7 +29,7 @@ public class TransportSearchClient extends AbstractSearchClient { } @Override - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { helper.closeClient(settings); } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 42cd669..4089d89 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -2,7 +2,6 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -42,35 +41,32 @@ class BulkClientTest { @Test void testSingleDoc() throws Exception { - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) - .build(); - try { + .build()) { bulkClient.newIndex("test"); bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - } finally { - assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.close(); } } @Test void testNewIndex() throws Exception { - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .build(); - bulkClient.newIndex("test"); - bulkClient.close(); + .build()) { + bulkClient.newIndex("test"); + } } @Test @@ -80,9 +76,9 @@ class BulkClientTest { .put(helper.getTransportSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) - .build()) { + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { XContentBuilder builder = JsonXContent.contentBuilder() .startObject() .startObject("doc") @@ -101,13 +97,12 @@ class BulkClientTest { @Test void testRandomDocs() throws Exception { long numactions = ACTIONS; - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .build(); - try { + .build()) { bulkClient.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { bulkClient.index("test", null, false, @@ -115,35 +110,30 @@ class BulkClientTest { } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); - bulkClient.close(); } } @Test - void testThreadedRandomDocs() throws Exception { + void testThreadedRandomDocs() { int maxthreads = Runtime.getRuntime().availableProcessors(); Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions); - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads * 2) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) .put(Parameters.ENABLE_BULK_LOGGING.name(), "true") - .build(); - try { + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -175,19 +165,15 @@ class BulkClientTest { logger.warn("latch timeout"); } bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); - assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount()); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { + assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + bulkClient.refreshIndex("test"); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); - bulkClient.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 10b90fa..96a2e61 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -20,7 +20,7 @@ class DuplicateIDTest { private static final Long ACTIONS = 100L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + private static final Long MAX_ACTIONS_PER_REQUEST = 5L; private final TestExtension.Helper helper; @@ -45,7 +45,7 @@ class DuplicateIDTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index cfd10af..d6cfd7a 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -33,7 +33,7 @@ class IndexShiftTest { this.helper = helper; } - @Test + @Test void testIndexShift() throws Exception { try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index e42c9b1..1aec9b1 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -39,12 +39,11 @@ class SearchTest { @Test void testDocStream() throws Exception { long numactions = ACTIONS; - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .build(); - try (bulkClient) { + .build()) { bulkClient.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { bulkClient.index("test", null, false, @@ -54,12 +53,12 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); } - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); - } - assertNull(bulkClient.getBulkController().getLastBulkError()); try (TransportSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(TransportSearchClientProvider.class) .put(helper.getTransportSettings()) diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index f55d3be..53e5758 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -28,7 +28,6 @@ class SmokeTest { this.helper = helper; } - @Test void smokeTest() throws Exception { try (TransportAdminClient adminClient = ClientBuilder.builder() @@ -64,8 +63,8 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index 7867e42..2bda899 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; @@ -33,8 +32,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; -import java.util.HashMap; -import java.util.Map; +import java.security.SecureRandom; +import java.util.Objects; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -42,11 +41,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final Logger logger = LogManager.getLogger("test"); - private static final Random random = new Random(); - - private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); - - private static final String key = "es-instance"; + private static final String key = "es-instance-"; private static final AtomicInteger count = new AtomicInteger(0); @@ -62,71 +57,35 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - // initialize new helper here, increase counter - return extensionContext.getParent().get().getStore(ns) - .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class); + return extensionContext.getParent().isPresent() ? + extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class) : null; } @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - Helper helper = extensionContext.getParent().get().getStore(ns) - .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + Helper helper = extensionContext.getParent().isPresent() ? + extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; + Objects.requireNonNull(helper); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); - helper.startNode("1"); - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); - Object obj = response.iterator().next().getTransport().getAddress() - .publishAddress(); - if (obj instanceof InetSocketTransportAddress) { - InetSocketTransportAddress address = (InetSocketTransportAddress) obj; - helper.host = address.address().getHostName(); - helper.port = address.address().getPort(); - } - try { - ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) - .timeout(TimeValue.timeValueSeconds(30))).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + ", from here on, everything will fail!"); - } - } catch (ElasticsearchTimeoutException e) { - throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); - } - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); - ClusterStateResponse clusterStateResponse = - helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); - logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); - logger.info("host = {} port = {}", helper.host, helper.port); + helper.startNode(); + helper.greenHealth(); + logger.info("cluster name = {}", helper.clusterName()); } @Override public void afterEach(ExtensionContext extensionContext) throws Exception { - Helper helper = extensionContext.getParent().get().getStore(ns) - .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); - closeNodes(helper); - deleteFiles(Paths.get(helper.getHome() + "/data")); - logger.info("data files wiped"); - Thread.sleep(2000L); // let OS commit changes - } - - private void closeNodes(Helper helper) throws IOException { - logger.info("closing all clients"); - for (AbstractClient client : helper.clients.values()) { - client.close(); - } - logger.info("closing all nodes"); - for (Node node : helper.nodes.values()) { - if (node != null) { - node.close(); - } - } - logger.info("all nodes closed"); + Helper helper = extensionContext.getParent().isPresent() ? + extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; + Objects.requireNonNull(helper); + helper.closeNodes(); + deleteFiles(Paths.get(helper.getHome())); + logger.info("files wiped"); + Thread.sleep(1000L); // let OS commit changes } private static void deleteFiles(Path directory) throws IOException { if (Files.exists(directory)) { - Files.walkFileTree(directory, new SimpleFileVisitor() { + Files.walkFileTree(directory, new SimpleFileVisitor<>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); @@ -144,7 +103,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private Helper create() { Helper helper = new Helper(); - helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); + String home = System.getProperty("path.home", "build/elxtransport"); + helper.setHome(home + "/" + helper.randomString(8)); helper.setClusterName("test-cluster-" + helper.randomString(8)); logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); return helper; @@ -160,9 +120,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft int port; - Map nodes = new HashMap<>(); + Node node; - Map clients = new HashMap<>(); + AbstractClient client; void setHome(String home) { this.home = home; @@ -196,12 +156,18 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .build(); } - void startNode(String id) { - buildNode(id).start(); - } - - ElasticsearchClient client(String id) { - return clients.get(id); + void startNode() { + buildNode().start(); + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); + NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + Object obj = response.iterator().next().getTransport().getAddress() + .publishAddress(); + if (obj instanceof InetSocketTransportAddress) { + InetSocketTransportAddress address = (InetSocketTransportAddress) obj; + host = address.address().getHostName(); + port = address.address().getPort(); + logger.info("host = {} port = {}", host, port); + } } String randomString(int len) { @@ -213,16 +179,51 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return new String(buf); } - private Node buildNode(String id) { + Node buildNode() { + String id = "1"; Settings nodeSettings = Settings.builder() .put(getNodeSettings()) .put("node.name", id) .build(); - Node node = new MockNode(nodeSettings); - AbstractClient client = (AbstractClient) node.client(); - nodes.put(id, node); - clients.put(id, client); + node = new MockNode(nodeSettings); + client = (AbstractClient) node.client(); return node; } + + void closeNodes() { + if (client != null) { + logger.info("closing client"); + client.close(); + } + if (node != null) { + logger.info("closing node"); + node.close(); + } + } + + void greenHealth() throws IOException { + try { + ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, + new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) + .timeout(TimeValue.timeValueSeconds(30))).actionGet(); + if (healthResponse != null && healthResponse.isTimedOut()) { + throw new IOException("cluster state is " + healthResponse.getStatus().name() + + ", from here on, everything will fail!"); + } + } catch (ElasticsearchTimeoutException e) { + throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); + } + } + + String clusterName() { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); + ClusterStateResponse clusterStateResponse = + client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); + return clusterStateResponse.getClusterName().value(); + } + + private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); + + private static final Random random = new SecureRandom(); } } diff --git a/elx-transport/src/test/resources/log4j2-test.xml b/elx-transport/src/test/resources/log4j2-test.xml index 6c323f8..11bffcf 100644 --- a/elx-transport/src/test/resources/log4j2-test.xml +++ b/elx-transport/src/test/resources/log4j2-test.xml @@ -10,4 +10,4 @@ - \ No newline at end of file + diff --git a/gradle.properties b/gradle.properties index 0662f74..e54a08b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.19 +version = 2.2.1.20 gradle.wrapper.version = 6.4.1 xbib-metrics.version = 2.1.0