diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java index e0cdb96..e190a51 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java @@ -21,6 +21,10 @@ public interface SearchMetric extends Closeable { Count getEmptyQueries(); + Count getFailedQueries(); + + Count getTimeoutQueries(); + long elapsed(); void start(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 209f8d2..9c6bb72 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -187,7 +187,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public List resolveAlias(String alias) { if (alias == null) { - return List.of(); + return Collections.emptyList(); } ensureClientIsPresent(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); @@ -247,7 +247,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements // two situations: 1. a new alias 2. there is already an old index with the alias Optional oldIndex = resolveAlias(index).stream().sorted().findFirst(); Map oldAliasMap = oldIndex.map(this::getAliases).orElse(null); - logger.info("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap); + logger.info("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap); final List newAliases = new ArrayList<>(); final List moveAliases = new ArrayList<>(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); @@ -312,7 +312,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { - return indexDefinition != null && indexDefinition.isPruneEnabled() && indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null? + return indexDefinition != null && indexDefinition.isPruneEnabled() && indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null ? pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), indexDefinition.getDateTimePattern(), @@ -457,7 +457,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements .setDateTimePattern(dateTimePattern) .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) .setShift(settings.getAsBoolean("shift", true)) - .setShift(settings.getAsBoolean("prune", true)) + .setPrune(settings.getAsBoolean("prune", true)) .setReplicaLevel(settings.getAsInt("replica", 0)) .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) .setRetention(indexRetention) @@ -487,9 +487,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements map.keys().forEach((Consumer>) stringObjectCursor -> { ImmutableOpenMap mappings = map.get(stringObjectCursor.value); for (ObjectObjectCursor cursor : mappings) { - String mappingName = cursor.key; MappingMetadata mappingMetaData = cursor.value; - checkMapping(index, mappingName, mappingMetaData); + checkMapping(index, mappingMetaData); } }); } @@ -554,7 +553,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return result; } - private void checkMapping(String index, String type, MappingMetadata mappingMetaData) { + private void checkMapping(String index, MappingMetadata mappingMetaData) { try { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) .setIndices(index) @@ -579,8 +578,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements empty.incrementAndGet(); } }); - logger.info("index={} type={} numfields={} fieldsnotused={}", - index, type, map.size(), empty.get()); + logger.info("index={} numfields={} fieldsnotused={}", + index, map.size(), empty.get()); } } catch (Exception e) { logger.error(e.getMessage(), e); @@ -667,7 +666,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } + private static class EmptyIndexShiftResult implements IndexShiftResult { + @Override public List getMovedAliases() { return Collections.emptyList(); @@ -717,7 +718,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public String toString() { - return "PRUNED: " + indicesToDelete; + return "PRUNED: " + indicesToDelete; } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index f08191c..bd020c3 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -59,7 +59,7 @@ public abstract class AbstractBasicClient implements BasicClient { this.settings = settings; setClient(createClient(settings)); } else { - logger.log(Level.WARN, "not initializing"); + logger.log(Level.WARN, "not initializing client"); } } @@ -109,7 +109,7 @@ public abstract class AbstractBasicClient implements BasicClient { ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); if (healthResponse.isTimedOut()) { - String message = "timeout waiting for cluster shards: " + timeout; + String message = "timeout while waiting for cluster shards: " + timeout; logger.error(message); throw new IllegalStateException(message); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 4adb845..7719f9d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -113,7 +113,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements return; } ensureClientIsPresent(); - waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) .setIndex(index); if (settings != null) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index b899be8..e767e95 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -122,34 +122,42 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement @Override public Stream search(Consumer queryBuilder, - TimeValue scrollTime, int scrollSize) { + TimeValue scrollTime, int scrollSize) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); ActionFuture actionFuture = searchRequestBuilder.execute(); searchMetric.getCurrentQueries().inc(); - SearchResponse originalSearchResponse = actionFuture.actionGet(); + SearchResponse initialSearchResponse = actionFuture.actionGet(); searchMetric.getCurrentQueries().dec(); searchMetric.getQueries().inc(); searchMetric.markTotalQueries(1); - if (originalSearchResponse.getHits().getTotalHits().value == 0) { + if (initialSearchResponse.getFailedShards() > 0) { + searchMetric.getFailedQueries().inc(); + } else if (initialSearchResponse.isTimedOut()) { + searchMetric.getTimeoutQueries().inc(); + } else if (initialSearchResponse.getHits().getTotalHits().value == 0) { searchMetric.getEmptyQueries().inc(); } else { searchMetric.getSucceededQueries().inc(); } - Stream infiniteResponses = Stream.iterate(originalSearchResponse, + Stream responseStream = Stream.iterate(initialSearchResponse, searchResponse -> { SearchScrollRequestBuilder searchScrollRequestBuilder = new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) - .setScrollId(searchResponse.getScrollId()) - .setScroll(scrollTime); + .setScrollId(searchResponse.getScrollId()) + .setScroll(scrollTime); ActionFuture actionFuture1 = searchScrollRequestBuilder.execute(); searchMetric.getCurrentQueries().inc(); SearchResponse searchResponse1 = actionFuture1.actionGet(); searchMetric.getCurrentQueries().dec(); searchMetric.getQueries().inc(); searchMetric.markTotalQueries(1); - if (searchResponse1.getHits().getHits().length == 0) { + if (searchResponse1.getFailedShards() > 0) { + searchMetric.getFailedQueries().inc(); + } else if (searchResponse1.isTimedOut()) { + searchMetric.getTimeoutQueries().inc(); + } else if (searchResponse1.getHits().getHits().length == 0) { searchMetric.getEmptyQueries().inc(); } else { searchMetric.getSucceededQueries().inc(); @@ -163,9 +171,9 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement .addScrollId(searchResponse.getScrollId()); clearScrollRequestBuilder.execute().actionGet(); }; - return StreamSupport.stream(TakeWhileSpliterator.over(infiniteResponses.spliterator(), + return StreamSupport.stream(TakeWhileSpliterator.over(responseStream.spliterator(), condition, lastAction), false) - .onClose(infiniteResponses::close) + .onClose(responseStream::close) .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())); } @@ -174,7 +182,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId); } - static class TakeWhileSpliterator implements Spliterator { + private static class TakeWhileSpliterator implements Spliterator { private final Spliterator source; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index 5cdf37c..b63e1c6 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -72,16 +72,16 @@ public class DefaultBulkController implements BulkController { public void init(Settings settings) { bulkMetric.init(settings); int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), - Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.asInteger()); + Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), - Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.asInteger()); + Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum()); TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), - TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.asInteger())); + TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum())); ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), - ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.asString(), + ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), "maxVolumePerRequest")); boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), - Parameters.ENABLE_BULK_LOGGING.asBool()); + Parameters.ENABLE_BULK_LOGGING.getValue()); BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java index abc6307..71a7421 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java @@ -13,7 +13,7 @@ public class DefaultIndexRetention implements IndexRetention { this.minToKeep = 2; } - @Override + @Override public IndexRetention setDelta(int delta) { this.delta = delta; return this; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index 5f164b3..4162faf 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -1,7 +1,5 @@ package org.xbib.elx.common; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.SearchMetric; import org.xbib.metrics.api.Count; @@ -12,8 +10,6 @@ import java.util.concurrent.Executors; public class DefaultSearchMetric implements SearchMetric { - private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName()); - private final Meter totalQuery; private final Count currentQuery; @@ -24,6 +20,10 @@ public class DefaultSearchMetric implements SearchMetric { private final Count emptyQueries; + private final Count failedQueries; + + private final Count timeoutQueries; + private Long started; private Long stopped; @@ -34,11 +34,12 @@ public class DefaultSearchMetric implements SearchMetric { queries = new CountMetric(); succeededQueries = new CountMetric(); emptyQueries = new CountMetric(); + failedQueries = new CountMetric(); + timeoutQueries = new CountMetric(); } @Override public void init(Settings settings) { - logger.info("init"); start(); } @@ -72,6 +73,16 @@ public class DefaultSearchMetric implements SearchMetric { return emptyQueries; } + @Override + public Count getFailedQueries() { + return failedQueries; + } + + @Override + public Count getTimeoutQueries() { + return timeoutQueries; + } + @Override public long elapsed() { return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L; diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 69df0a4..bc7ccf7 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -20,14 +20,14 @@ public enum Parameters { FLUSH_INTERVAL("flush_interval"); - boolean flag; + boolean value; int num; String string; - Parameters(boolean flag) { - this.flag = flag; + Parameters(boolean value) { + this.value = value; } Parameters(int num) { @@ -38,15 +38,15 @@ public enum Parameters { this.string = string; } - public boolean asBool() { - return flag; + public boolean getValue() { + return value; } - public int asInteger() { - return num; + public int getNum() { + return num; } - public String asString() { + public String getString() { return string; } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java index 65745bc..fa25eed 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java @@ -19,6 +19,7 @@ import org.xbib.elx.http.HttpBulkClientProvider; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -38,15 +39,14 @@ class IndexPruneTest { @Test void testPrune() throws IOException { - final HttpAdminClient adminClient = ClientBuilder.builder() + try (HttpAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(HttpAdminClientProvider.class) .put(helper.getHttpSettings()) .build(); - final HttpBulkClient bulkClient = ClientBuilder.builder() + HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .build(); - try { + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -55,41 +55,43 @@ class IndexPruneTest { IndexDefinition indexDefinition = new DefaultIndexDefinition(); indexDefinition.setIndex("test"); indexDefinition.setFullIndexName("test1"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test2", settings); indexDefinition.setFullIndexName("test2"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test3", settings); indexDefinition.setFullIndexName("test3"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test4", settings); indexDefinition.setFullIndexName("test4"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); indexDefinition.setPrune(true); IndexRetention indexRetention = new DefaultIndexRetention(); indexDefinition.setRetention(indexRetention); + indexDefinition.setEnabled(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); - logger.info(indexPruneResult.toString()); + logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test2")); assertFalse(indexPruneResult.getDeletedIndices().contains("test3")); assertFalse(indexPruneResult.getDeletedIndices().contains("test4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test1", "test2", "test3", "test4")) { - list.add(bulkClient.isIndexExists(index)); + list.add(adminClient.isIndexExists(index)); } logger.info(list); assertFalse(list.get(0)); assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - } finally { - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - adminClient.close(); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java index 2258fc0..819b592 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java @@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -16,7 +15,6 @@ import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; import org.xbib.elx.http.HttpSearchClient; import org.xbib.elx.http.HttpSearchClientProvider; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -26,9 +24,9 @@ class SearchTest { private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); - private static final Long ACTIONS = 100L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + private static final Long MAX_ACTIONS_PER_REQUEST = 100L; private final TestExtension.Helper helper; @@ -54,13 +52,8 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); - bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}"); - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions + 1, bulkClient.getSearchableDocs("test")); } - assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -69,26 +62,25 @@ class SearchTest { .setSearchClientProvider(HttpSearchClientProvider.class) .put(helper.getHttpSettings()) .build()) { - Optional responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0")); - assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString()); Stream stream = searchClient.search(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery()), - TimeValue.timeValueMinutes(1), 10); + TimeValue.timeValueMillis(100), 579); long count = stream.count(); - assertEquals(numactions + 1, count); + assertEquals(numactions, count); Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> { - logger.info(id); idcount.incrementAndGet(); }); - assertEquals(numactions + 1, idcount.get()); - assertEquals(15, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(numactions, idcount.get()); + assertEquals(275, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount()); assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java b/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java index 82452a5..8e8a202 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java @@ -85,17 +85,17 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Helper helper = extensionContext.getParent().get().getStore(ns) .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); - helper.startNode("1"); + helper.startNode(); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest() .clear() .addMetric(NodesInfoRequest.Metric.HTTP.metricName()); - NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + NodesInfoResponse response = helper.client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); TransportAddress address = response.getNodes().get(0).getInfo(HttpInfo.class).getAddress().publishAddress(); helper.httpHost = address.address().getHostName(); helper.httpPort = address.address().getPort(); logger.log(Level.INFO, "http host = " + helper.httpHost + " port = " + helper.httpPort); try { - ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, + ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) .timeout(TimeValue.timeValueSeconds(30))).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { @@ -107,7 +107,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateResponse clusterStateResponse = - helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); + helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); } @@ -122,15 +122,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } private void closeNodes(Helper helper) throws IOException { - logger.info("closing all clients"); - for (AbstractClient client : helper.clients.values()) { - client.close(); - } - logger.info("closing all nodes"); - for (Node node : helper.nodes.values()) { - if (node != null) { - node.close(); - } + logger.info("closing node"); + if (helper.node != null) { + helper.node.close(); } logger.info("all nodes closed"); } @@ -162,7 +156,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return helper; } - class Helper { + static class Helper { String home; @@ -172,9 +166,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft int httpPort; - Map nodes = new HashMap<>(); - - Map clients = new HashMap<>(); + Node node; void setHome(String home) { this.home = home; @@ -196,7 +188,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return Settings.builder() .put("cluster.name", getClusterName()) .put("path.home", getHome()) - //.put("cluster.initial_master_nodes", "1") + //.put("cluster.initial_master_nodes", ) //.put("discovery.seed_hosts", "127.0.0.1:9300") .build(); } @@ -210,12 +202,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .build(); } - void startNode(String id) throws NodeValidationException { - buildNode(id).start(); + void startNode() throws NodeValidationException { + buildNode().start(); } - ElasticsearchClient client(String id) { - return clients.get(id); + ElasticsearchClient client() { + return node.client(); } String randomString(int len) { @@ -227,17 +219,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return new String(buf); } - private Node buildNode(String id) { + private Node buildNode() { Settings nodeSettings = Settings.builder() .put(getNodeSettings()) - .put("node.name", id) - .put("path.data", getHome() + "/data-" + id) + .put("node.name", "1" ) + .put("path.data", getHome() + "/data-1") .build(); List> plugins = Collections.singletonList(Netty4Plugin.class); - Node node = new MockNode(nodeSettings, plugins); - AbstractClient client = (AbstractClient) node.client(); - nodes.put(id, node); - clients.put(id, client); + this.node = new MockNode(nodeSettings, plugins); return node; } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/package-info.java b/elx-http/src/test/java/org/xbib/elx/http/test/package-info.java deleted file mode 100644 index 2bb05c9..0000000 --- a/elx-http/src/test/java/org/xbib/elx/http/test/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * - */ -package org.xbib.elx.http.test; diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 68de671..3a750ff 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -41,14 +41,14 @@ class BulkClientTest { @Test void testSingleDoc() throws Exception { - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) .build()) { bulkClient.newIndex("test"); - bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.index("test", "doc1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); @@ -61,9 +61,9 @@ class BulkClientTest { @Test void testNewIndex() throws Exception { - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) .build()) { bulkClient.newIndex("test"); @@ -72,13 +72,13 @@ class BulkClientTest { @Test void testMapping() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build()) { XContentBuilder builder = JsonXContent.contentBuilder() .startObject() @@ -96,9 +96,9 @@ class BulkClientTest { @Test void testRandomDocs() throws Exception { long numactions = ACTIONS; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) .build()) { @@ -122,12 +122,14 @@ class BulkClientTest { @Test void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); + Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest) + .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) @@ -160,12 +162,12 @@ class BulkClientTest { } bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + bulkClient.refreshIndex("test"); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index 5464799..a6cfdc2 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -33,9 +33,9 @@ class DuplicateIDTest { @Test void testDuplicateDocIDs() throws Exception { long numactions = ACTIONS; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex("test"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 38b341c..85c2e2d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -19,6 +19,7 @@ import org.xbib.elx.node.NodeBulkClientProvider; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -38,13 +39,13 @@ class IndexPruneTest { @Test void testPrune() throws IOException { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) @@ -54,27 +55,33 @@ class IndexPruneTest { IndexDefinition indexDefinition = new DefaultIndexDefinition(); indexDefinition.setIndex("test_prune"); indexDefinition.setFullIndexName("test_prune1"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test_prune2", settings); indexDefinition.setFullIndexName("test_prune2"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test_prune3", settings); indexDefinition.setFullIndexName("test_prune3"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test_prune4", settings); indexDefinition.setFullIndexName("test_prune4"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); indexDefinition.setPrune(true); IndexRetention indexRetention = new DefaultIndexRetention(); indexDefinition.setRetention(indexRetention); + indexDefinition.setEnabled(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); + logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - list.add(bulkClient.isIndexExists(index)); + list.add(adminClient.isIndexExists(index)); } logger.info(list); assertFalse(list.get(0)); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 341a866..d3538b3 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -37,13 +37,13 @@ class IndexShiftTest { @Test void testIndexShift() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 0dcad74..b71f214 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -17,7 +16,6 @@ import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeSearchClient; import org.xbib.elx.node.NodeSearchClientProvider; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -27,9 +25,9 @@ class SearchTest { private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); - private static final Long ACTIONS = 100L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + private static final Long MAX_ACTIONS_PER_REQUEST = 100L; private final TestExtension.Helper helper; @@ -40,9 +38,9 @@ class SearchTest { @Test void testDocStream() throws Exception { long numactions = ACTIONS; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex("test"); @@ -54,41 +52,33 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); - bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}"); - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions + 1, bulkClient.getSearchableDocs("test")); - assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); } - try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1")) + try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client()) .setSearchClientProvider(NodeSearchClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build()) { - Optional responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0")); - assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString()); Stream stream = searchClient.search(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery()), - TimeValue.timeValueMinutes(1), 10); + TimeValue.timeValueMillis(100), 579); long count = stream.count(); - assertEquals(numactions + 1, count); + assertEquals(numactions, count); Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); - final AtomicInteger idcount = new AtomicInteger(); - ids.forEach(id -> { - logger.info(id); - idcount.incrementAndGet(); - }); - assertEquals(numactions + 1, idcount.get()); - assertEquals(15, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount()); + final AtomicInteger idcount = new AtomicInteger(0); + ids.forEach(id -> idcount.incrementAndGet()); + assertEquals(numactions, idcount.get()); + assertEquals(275, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount()); assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 353d7a8..998ad3a 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -30,13 +30,13 @@ class SmokeTest { @Test void smokeTest() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings("1")) + .put(helper.getNodeSettings()) .build()) { IndexDefinition indexDefinition = adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index b59275d..102e6f8 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -83,15 +82,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Helper helper = extensionContext.getParent().get().getStore(ns) .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); - helper.startNode("1"); + helper.startNode(); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName()); - NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + NodesInfoResponse response = helper.client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); TransportAddress address = response.getNodes().get(0).getNode().getAddress(); helper.host = address.address().getHostName(); helper.port = address.address().getPort(); logger.info("host = " + helper.host + " port = " + helper.port); try { - ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, + ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) .timeout(TimeValue.timeValueSeconds(30))).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { @@ -103,7 +102,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateResponse clusterStateResponse = - helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); + helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); logger.info("cluster up, name = {}", clusterStateResponse.getClusterName().value()); } @@ -118,13 +117,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } private void closeNodes(Helper helper) throws IOException { - logger.info("closing all nodes"); - for (Node node : helper.nodes.values()) { - if (node != null) { - node.close(); - } + logger.info("closing node"); + if (helper.node != null) { + helper.node.close(); } - logger.info("all nodes closed"); + logger.info("node closed"); } private static void deleteFiles(Path directory) throws IOException { @@ -164,7 +161,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft int port; - Map nodes = new HashMap<>(); + Node node; void setHome(String home) { this.home = home; @@ -182,7 +179,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return cluster; } - Settings getNodeSettings(String id) { + Settings getNodeSettings() { return Settings.builder() .put("cluster.name", getClusterName()) .put("path.home", getHome()) @@ -190,12 +187,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .build(); } - void startNode(String id) throws NodeValidationException { - buildNode(id).start(); + void startNode() throws NodeValidationException { + buildNode().start(); } - ElasticsearchClient client(String id) { - return nodes.get(id).client(); + ElasticsearchClient client() { + return node.client(); } String randomString(int len) { @@ -207,14 +204,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return new String(buf); } - private Node buildNode(String id) { + private Node buildNode() { Settings nodeSettings = Settings.builder() - .put(getNodeSettings(id)) - .put("node.name", id) + .put(getNodeSettings()) + .put("node.name", "1") .build(); List> plugins = Collections.singletonList(Netty4Plugin.class); - Node node = new MockNode(nodeSettings, plugins); - nodes.put(id, node); + this.node = new MockNode(nodeSettings, plugins); return node; } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java b/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java deleted file mode 100644 index 662c94e..0000000 --- a/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * - */ -package org.xbib.elx.node.test; diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index 8708d70..f24a12a 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -19,6 +19,7 @@ import org.xbib.elx.transport.TransportBulkClientProvider; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -38,15 +39,14 @@ class IndexPruneTest { @Test void testPrune() throws IOException { - final TransportAdminClient adminClient = ClientBuilder.builder() + try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - final TransportBulkClient bulkClient = ClientBuilder.builder() + TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .build(); - try { + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -55,36 +55,39 @@ class IndexPruneTest { indexDefinition.setIndex("test_prune"); indexDefinition.setFullIndexName("test_prune1"); bulkClient.newIndex("test_prune1", settings); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test_prune2", settings); indexDefinition.setFullIndexName("test_prune2"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test_prune3", settings); indexDefinition.setFullIndexName("test_prune3"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); bulkClient.newIndex("test_prune4", settings); indexDefinition.setFullIndexName("test_prune4"); - adminClient.shiftIndex(indexDefinition, List.of()); + indexDefinition.setShift(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList()); indexDefinition.setPrune(true); IndexRetention indexRetention = new DefaultIndexRetention(); indexDefinition.setRetention(indexRetention); + indexDefinition.setEnabled(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); + logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - list.add(bulkClient.isIndexExists(index)); + list.add(adminClient.isIndexExists(index)); } logger.info(list); assertFalse(list.get(0)); assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - } finally { - adminClient.close(); - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index fd24ebd..d573ebd 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -27,9 +27,9 @@ class SearchTest { private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); - private static final Long ACTIONS = 100L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + private static final Long MAX_ACTIONS_PER_REQUEST = 100L; private final TestExtension.Helper helper; @@ -55,13 +55,8 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); - bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}"); - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions + 1, bulkClient.getSearchableDocs("test")); } - assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -70,26 +65,23 @@ class SearchTest { .setSearchClientProvider(TransportSearchClientProvider.class) .put(helper.getTransportSettings()) .build()) { - Optional responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0")); - assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString()); Stream stream = searchClient.search(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery()), - TimeValue.timeValueMinutes(1), 10); + TimeValue.timeValueMillis(100), 579); long count = stream.count(); - assertEquals(numactions + 1, count); + assertEquals(numactions, count); Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); - final AtomicInteger idcount = new AtomicInteger(); - ids.forEach(id -> { - logger.info(id); - idcount.incrementAndGet(); - }); - assertEquals(numactions + 1, idcount.get()); - assertEquals(15, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount()); + final AtomicInteger idcount = new AtomicInteger(0); + ids.forEach(id -> idcount.incrementAndGet()); + assertEquals(numactions, idcount.get()); + assertEquals(275, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount()); assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index 29161eb..4139001 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -37,9 +36,7 @@ import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -83,14 +80,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Helper helper = extensionContext.getParent().get().getStore(ns) .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); - helper.startNode("1"); + helper.startNode(); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName()); - NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + NodesInfoResponse response = helper.client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); TransportAddress address = response.getNodes().get(0).getNode().getAddress(); helper.host = address.address().getHostName(); helper.port = address.address().getPort(); try { - ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, + ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) .timeout(TimeValue.timeValueSeconds(30))).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { @@ -102,7 +99,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateResponse clusterStateResponse = - helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); + helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); } @@ -117,17 +114,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } private void closeNodes(Helper helper) throws IOException { - logger.info("closing all clients"); - for (AbstractClient client : helper.clients.values()) { - client.close(); + logger.info("closing node"); + if (helper.node != null) { + helper.node.close(); } - logger.info("closing all nodes"); - for (Node node : helper.nodes.values()) { - if (node != null) { - node.close(); - } - } - logger.info("all nodes closed"); + logger.info("node closed"); } private static void deleteFiles(Path directory) throws IOException { @@ -157,7 +148,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return helper; } - class Helper { + static class Helper { String home; @@ -167,9 +158,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft int port; - Map nodes = new HashMap<>(); - - Map clients = new HashMap<>(); + Node node; void setHome(String home) { this.home = home; @@ -205,12 +194,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .build(); } - void startNode(String id) throws NodeValidationException { - buildNode(id).start(); + void startNode() throws NodeValidationException { + buildNode().start(); } - ElasticsearchClient client(String id) { - return clients.get(id); + ElasticsearchClient client() { + return node.client(); } String randomString(int len) { @@ -222,16 +211,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return new String(buf); } - private Node buildNode(String id) { + private Node buildNode() { Settings nodeSettings = Settings.builder() .put(getNodeSettings()) - .put("node.name", id) + .put("node.name", "1") .build(); List> plugins = Collections.singletonList(Netty4Plugin.class); - Node node = new MockNode(nodeSettings, plugins); - AbstractClient client = (AbstractClient) node.client(); - nodes.put(id, node); - clients.put(id, client); + this.node = new MockNode(nodeSettings, plugins); return node; } } diff --git a/gradle.properties b/gradle.properties index 2b4de0f..e29acd9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -6,9 +6,9 @@ gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0 xbib-netty-http.version = 4.1.60.0 elasticsearch.version = 7.10.2 -# ES 7.10.2.1 uses Jackson 2.10.4 +# ES 7.10.2 uses Jackson 2.10.4 jackson.version = 2.12.1 -netty.version = 4.1.58.Final +netty.version = 4.1.60.Final tcnative.version = 2.0.36.Final bouncycastle.version = 1.64 log4j.version = 2.14.0