align with es221

This commit is contained in:
Jörg Prante 2021-04-09 13:01:16 +02:00
parent 95649e7950
commit 7191bb550b
25 changed files with 232 additions and 258 deletions

View file

@ -21,6 +21,10 @@ public interface SearchMetric extends Closeable {
Count getEmptyQueries(); Count getEmptyQueries();
Count getFailedQueries();
Count getTimeoutQueries();
long elapsed(); long elapsed();
void start(); void start();

View file

@ -187,7 +187,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public List<String> resolveAlias(String alias) { public List<String> resolveAlias(String alias) {
if (alias == null) { if (alias == null) {
return List.of(); return Collections.emptyList();
} }
ensureClientIsPresent(); ensureClientIsPresent();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
@ -247,7 +247,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
// two situations: 1. a new alias 2. there is already an old index with the alias // two situations: 1. a new alias 2. there is already an old index with the alias
Optional<String> oldIndex = resolveAlias(index).stream().sorted().findFirst(); Optional<String> oldIndex = resolveAlias(index).stream().sorted().findFirst();
Map<String, String> oldAliasMap = oldIndex.map(this::getAliases).orElse(null); Map<String, String> oldAliasMap = oldIndex.map(this::getAliases).orElse(null);
logger.info("old index = {} old alias map = {}", oldIndex.orElse("<not found>"), oldAliasMap); logger.info("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap);
final List<String> newAliases = new ArrayList<>(); final List<String> newAliases = new ArrayList<>();
final List<String> moveAliases = new ArrayList<>(); final List<String> moveAliases = new ArrayList<>();
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
@ -312,7 +312,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
return indexDefinition != null && indexDefinition.isPruneEnabled() && indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null? return indexDefinition != null && indexDefinition.isPruneEnabled() && indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null ?
pruneIndex(indexDefinition.getIndex(), pruneIndex(indexDefinition.getIndex(),
indexDefinition.getFullIndexName(), indexDefinition.getFullIndexName(),
indexDefinition.getDateTimePattern(), indexDefinition.getDateTimePattern(),
@ -457,7 +457,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
.setDateTimePattern(dateTimePattern) .setDateTimePattern(dateTimePattern)
.setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) .setIgnoreErrors(settings.getAsBoolean("skiperrors", false))
.setShift(settings.getAsBoolean("shift", true)) .setShift(settings.getAsBoolean("shift", true))
.setShift(settings.getAsBoolean("prune", true)) .setPrune(settings.getAsBoolean("prune", true))
.setReplicaLevel(settings.getAsInt("replica", 0)) .setReplicaLevel(settings.getAsInt("replica", 0))
.setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS)
.setRetention(indexRetention) .setRetention(indexRetention)
@ -487,9 +487,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> { map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
ImmutableOpenMap<String, MappingMetadata> mappings = map.get(stringObjectCursor.value); ImmutableOpenMap<String, MappingMetadata> mappings = map.get(stringObjectCursor.value);
for (ObjectObjectCursor<String, MappingMetadata> cursor : mappings) { for (ObjectObjectCursor<String, MappingMetadata> cursor : mappings) {
String mappingName = cursor.key;
MappingMetadata mappingMetaData = cursor.value; MappingMetadata mappingMetaData = cursor.value;
checkMapping(index, mappingName, mappingMetaData); checkMapping(index, mappingMetaData);
} }
}); });
} }
@ -554,7 +553,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return result; return result;
} }
private void checkMapping(String index, String type, MappingMetadata mappingMetaData) { private void checkMapping(String index, MappingMetadata mappingMetaData) {
try { try {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(index) .setIndices(index)
@ -579,8 +578,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
empty.incrementAndGet(); empty.incrementAndGet();
} }
}); });
logger.info("index={} type={} numfields={} fieldsnotused={}", logger.info("index={} numfields={} fieldsnotused={}",
index, type, map.size(), empty.get()); index, map.size(), empty.get());
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
@ -667,7 +666,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
} }
} }
private static class EmptyIndexShiftResult implements IndexShiftResult { private static class EmptyIndexShiftResult implements IndexShiftResult {
@Override @Override
public List<String> getMovedAliases() { public List<String> getMovedAliases() {
return Collections.emptyList(); return Collections.emptyList();
@ -717,7 +718,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public String toString() { public String toString() {
return "PRUNED: " + indicesToDelete; return "PRUNED: " + indicesToDelete;
} }
} }

View file

@ -59,7 +59,7 @@ public abstract class AbstractBasicClient implements BasicClient {
this.settings = settings; this.settings = settings;
setClient(createClient(settings)); setClient(createClient(settings));
} else { } else {
logger.log(Level.WARN, "not initializing"); logger.log(Level.WARN, "not initializing client");
} }
} }
@ -109,7 +109,7 @@ public abstract class AbstractBasicClient implements BasicClient {
ClusterHealthResponse healthResponse = ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) { if (healthResponse.isTimedOut()) {
String message = "timeout waiting for cluster shards: " + timeout; String message = "timeout while waiting for cluster shards: " + timeout;
logger.error(message); logger.error(message);
throw new IllegalStateException(message); throw new IllegalStateException(message);
} }

View file

@ -113,7 +113,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
return; return;
} }
ensureClientIsPresent(); ensureClientIsPresent();
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
.setIndex(index); .setIndex(index);
if (settings != null) { if (settings != null) {

View file

@ -122,34 +122,42 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
@Override @Override
public Stream<SearchHit> search(Consumer<SearchRequestBuilder> queryBuilder, public Stream<SearchHit> search(Consumer<SearchRequestBuilder> queryBuilder,
TimeValue scrollTime, int scrollSize) { TimeValue scrollTime, int scrollSize) {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder); queryBuilder.accept(searchRequestBuilder);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute(); ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); searchMetric.getCurrentQueries().inc();
SearchResponse originalSearchResponse = actionFuture.actionGet(); SearchResponse initialSearchResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec(); searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc(); searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1); searchMetric.markTotalQueries(1);
if (originalSearchResponse.getHits().getTotalHits().value == 0) { if (initialSearchResponse.getFailedShards() > 0) {
searchMetric.getFailedQueries().inc();
} else if (initialSearchResponse.isTimedOut()) {
searchMetric.getTimeoutQueries().inc();
} else if (initialSearchResponse.getHits().getTotalHits().value == 0) {
searchMetric.getEmptyQueries().inc(); searchMetric.getEmptyQueries().inc();
} else { } else {
searchMetric.getSucceededQueries().inc(); searchMetric.getSucceededQueries().inc();
} }
Stream<SearchResponse> infiniteResponses = Stream.iterate(originalSearchResponse, Stream<SearchResponse> responseStream = Stream.iterate(initialSearchResponse,
searchResponse -> { searchResponse -> {
SearchScrollRequestBuilder searchScrollRequestBuilder = SearchScrollRequestBuilder searchScrollRequestBuilder =
new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
.setScrollId(searchResponse.getScrollId()) .setScrollId(searchResponse.getScrollId())
.setScroll(scrollTime); .setScroll(scrollTime);
ActionFuture<SearchResponse> actionFuture1 = searchScrollRequestBuilder.execute(); ActionFuture<SearchResponse> actionFuture1 = searchScrollRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); searchMetric.getCurrentQueries().inc();
SearchResponse searchResponse1 = actionFuture1.actionGet(); SearchResponse searchResponse1 = actionFuture1.actionGet();
searchMetric.getCurrentQueries().dec(); searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc(); searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1); searchMetric.markTotalQueries(1);
if (searchResponse1.getHits().getHits().length == 0) { if (searchResponse1.getFailedShards() > 0) {
searchMetric.getFailedQueries().inc();
} else if (searchResponse1.isTimedOut()) {
searchMetric.getTimeoutQueries().inc();
} else if (searchResponse1.getHits().getHits().length == 0) {
searchMetric.getEmptyQueries().inc(); searchMetric.getEmptyQueries().inc();
} else { } else {
searchMetric.getSucceededQueries().inc(); searchMetric.getSucceededQueries().inc();
@ -163,9 +171,9 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
.addScrollId(searchResponse.getScrollId()); .addScrollId(searchResponse.getScrollId());
clearScrollRequestBuilder.execute().actionGet(); clearScrollRequestBuilder.execute().actionGet();
}; };
return StreamSupport.stream(TakeWhileSpliterator.over(infiniteResponses.spliterator(), return StreamSupport.stream(TakeWhileSpliterator.over(responseStream.spliterator(),
condition, lastAction), false) condition, lastAction), false)
.onClose(infiniteResponses::close) .onClose(responseStream::close)
.flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())); .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
} }
@ -174,7 +182,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId); return search(queryBuilder, TimeValue.timeValueMinutes(1), 1000).map(SearchHit::getId);
} }
static class TakeWhileSpliterator<T> implements Spliterator<T> { private static class TakeWhileSpliterator<T> implements Spliterator<T> {
private final Spliterator<T> source; private final Spliterator<T> source;

View file

@ -72,16 +72,16 @@ public class DefaultBulkController implements BulkController {
public void init(Settings settings) { public void init(Settings settings) {
bulkMetric.init(settings); bulkMetric.init(settings);
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(),
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.asInteger()); Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum());
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(),
Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.asInteger()); Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum());
TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(),
TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.asInteger())); TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum()));
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.asString(), ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
"maxVolumePerRequest")); "maxVolumePerRequest"));
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
Parameters.ENABLE_BULK_LOGGING.asBool()); Parameters.ENABLE_BULK_LOGGING.getValue());
BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest) .setBulkActions(maxActionsPerRequest)

View file

@ -13,7 +13,7 @@ public class DefaultIndexRetention implements IndexRetention {
this.minToKeep = 2; this.minToKeep = 2;
} }
@Override @Override
public IndexRetention setDelta(int delta) { public IndexRetention setDelta(int delta) {
this.delta = delta; this.delta = delta;
return this; return this;

View file

@ -1,7 +1,5 @@
package org.xbib.elx.common; package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.SearchMetric; import org.xbib.elx.api.SearchMetric;
import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Count;
@ -12,8 +10,6 @@ import java.util.concurrent.Executors;
public class DefaultSearchMetric implements SearchMetric { public class DefaultSearchMetric implements SearchMetric {
private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName());
private final Meter totalQuery; private final Meter totalQuery;
private final Count currentQuery; private final Count currentQuery;
@ -24,6 +20,10 @@ public class DefaultSearchMetric implements SearchMetric {
private final Count emptyQueries; private final Count emptyQueries;
private final Count failedQueries;
private final Count timeoutQueries;
private Long started; private Long started;
private Long stopped; private Long stopped;
@ -34,11 +34,12 @@ public class DefaultSearchMetric implements SearchMetric {
queries = new CountMetric(); queries = new CountMetric();
succeededQueries = new CountMetric(); succeededQueries = new CountMetric();
emptyQueries = new CountMetric(); emptyQueries = new CountMetric();
failedQueries = new CountMetric();
timeoutQueries = new CountMetric();
} }
@Override @Override
public void init(Settings settings) { public void init(Settings settings) {
logger.info("init");
start(); start();
} }
@ -72,6 +73,16 @@ public class DefaultSearchMetric implements SearchMetric {
return emptyQueries; return emptyQueries;
} }
@Override
public Count getFailedQueries() {
return failedQueries;
}
@Override
public Count getTimeoutQueries() {
return timeoutQueries;
}
@Override @Override
public long elapsed() { public long elapsed() {
return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L; return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L;

View file

@ -20,14 +20,14 @@ public enum Parameters {
FLUSH_INTERVAL("flush_interval"); FLUSH_INTERVAL("flush_interval");
boolean flag; boolean value;
int num; int num;
String string; String string;
Parameters(boolean flag) { Parameters(boolean value) {
this.flag = flag; this.value = value;
} }
Parameters(int num) { Parameters(int num) {
@ -38,15 +38,15 @@ public enum Parameters {
this.string = string; this.string = string;
} }
public boolean asBool() { public boolean getValue() {
return flag; return value;
} }
public int asInteger() { public int getNum() {
return num; return num;
} }
public String asString() { public String getString() {
return string; return string;
} }
} }

View file

@ -19,6 +19,7 @@ import org.xbib.elx.http.HttpBulkClientProvider;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -38,15 +39,14 @@ class IndexPruneTest {
@Test @Test
void testPrune() throws IOException { void testPrune() throws IOException {
final HttpAdminClient adminClient = ClientBuilder.builder() try (HttpAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(HttpAdminClientProvider.class) .setAdminClientProvider(HttpAdminClientProvider.class)
.put(helper.getHttpSettings()) .put(helper.getHttpSettings())
.build(); .build();
final HttpBulkClient bulkClient = ClientBuilder.builder() HttpBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(HttpBulkClientProvider.class) .setBulkClientProvider(HttpBulkClientProvider.class)
.put(helper.getHttpSettings()) .put(helper.getHttpSettings())
.build(); .build()) {
try {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
@ -55,41 +55,43 @@ class IndexPruneTest {
IndexDefinition indexDefinition = new DefaultIndexDefinition(); IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test"); indexDefinition.setIndex("test");
indexDefinition.setFullIndexName("test1"); indexDefinition.setFullIndexName("test1");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test2", settings); bulkClient.newIndex("test2", settings);
indexDefinition.setFullIndexName("test2"); indexDefinition.setFullIndexName("test2");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test3", settings); bulkClient.newIndex("test3", settings);
indexDefinition.setFullIndexName("test3"); indexDefinition.setFullIndexName("test3");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test4", settings); bulkClient.newIndex("test4", settings);
indexDefinition.setFullIndexName("test4"); indexDefinition.setFullIndexName("test4");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexRetention indexRetention = new DefaultIndexRetention(); IndexRetention indexRetention = new DefaultIndexRetention();
indexDefinition.setRetention(indexRetention); indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info(indexPruneResult.toString()); logger.info("prune result = " + indexPruneResult);
assertTrue(indexPruneResult.getDeletedIndices().contains("test1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test1"));
assertTrue(indexPruneResult.getDeletedIndices().contains("test2")); assertTrue(indexPruneResult.getDeletedIndices().contains("test2"));
assertFalse(indexPruneResult.getDeletedIndices().contains("test3")); assertFalse(indexPruneResult.getDeletedIndices().contains("test3"));
assertFalse(indexPruneResult.getDeletedIndices().contains("test4")); assertFalse(indexPruneResult.getDeletedIndices().contains("test4"));
List<Boolean> list = new ArrayList<>(); List<Boolean> list = new ArrayList<>();
for (String index : Arrays.asList("test1", "test2", "test3", "test4")) { for (String index : Arrays.asList("test1", "test2", "test3", "test4")) {
list.add(bulkClient.isIndexExists(index)); list.add(adminClient.isIndexExists(index));
} }
logger.info(list); logger.info(list);
assertFalse(list.get(0)); assertFalse(list.get(0));
assertFalse(list.get(1)); assertFalse(list.get(1));
assertTrue(list.get(2)); assertTrue(list.get(2));
assertTrue(list.get(3)); assertTrue(list.get(3));
} finally {
bulkClient.close();
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkController().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkController().getLastBulkError());
adminClient.close();
} }
} }
} }

View file

@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -16,7 +15,6 @@ import org.xbib.elx.http.HttpBulkClient;
import org.xbib.elx.http.HttpBulkClientProvider; import org.xbib.elx.http.HttpBulkClientProvider;
import org.xbib.elx.http.HttpSearchClient; import org.xbib.elx.http.HttpSearchClient;
import org.xbib.elx.http.HttpSearchClientProvider; import org.xbib.elx.http.HttpSearchClientProvider;
import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -26,9 +24,9 @@ class SearchTest {
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
private static final Long ACTIONS = 100L; private static final Long ACTIONS = 100000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 10L; private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
@ -54,13 +52,8 @@ class SearchTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex("test"); bulkClient.refreshIndex("test");
assertEquals(numactions, bulkClient.getSearchableDocs("test")); assertEquals(numactions, bulkClient.getSearchableDocs("test"));
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}");
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex("test");
assertEquals(numactions + 1, bulkClient.getSearchableDocs("test"));
} }
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkController().getLastBulkError());
} }
@ -69,26 +62,25 @@ class SearchTest {
.setSearchClientProvider(HttpSearchClientProvider.class) .setSearchClientProvider(HttpSearchClientProvider.class)
.put(helper.getHttpSettings()) .put(helper.getHttpSettings())
.build()) { .build()) {
Optional<GetResponse> responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0"));
assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString());
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices("test") .setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()), .setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMinutes(1), 10); TimeValue.timeValueMillis(100), 579);
long count = stream.count(); long count = stream.count();
assertEquals(numactions + 1, count); assertEquals(numactions, count);
Stream<String> ids = searchClient.getIds(qb -> qb Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices("test") .setIndices("test")
.setQuery(QueryBuilders.matchAllQuery())); .setQuery(QueryBuilders.matchAllQuery()));
final AtomicInteger idcount = new AtomicInteger(); final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> { ids.forEach(id -> {
logger.info(id);
idcount.incrementAndGet(); idcount.incrementAndGet();
}); });
assertEquals(numactions + 1, idcount.get()); assertEquals(numactions, idcount.get());
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount()); assertEquals(275, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount()); assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount());
} }
} }
} }

View file

@ -85,17 +85,17 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Helper helper = extensionContext.getParent().get().getStore(ns) Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1"); helper.startNode();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest() NodesInfoRequest nodesInfoRequest = new NodesInfoRequest()
.clear() .clear()
.addMetric(NodesInfoRequest.Metric.HTTP.metricName()); .addMetric(NodesInfoRequest.Metric.HTTP.metricName());
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); NodesInfoResponse response = helper.client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
TransportAddress address = response.getNodes().get(0).getInfo(HttpInfo.class).getAddress().publishAddress(); TransportAddress address = response.getNodes().get(0).getInfo(HttpInfo.class).getAddress().publishAddress();
helper.httpHost = address.address().getHostName(); helper.httpHost = address.address().getHostName();
helper.httpPort = address.address().getPort(); helper.httpPort = address.address().getPort();
logger.log(Level.INFO, "http host = " + helper.httpHost + " port = " + helper.httpPort); logger.log(Level.INFO, "http host = " + helper.httpHost + " port = " + helper.httpPort);
try { try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet(); .timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) { if (healthResponse != null && healthResponse.isTimedOut()) {
@ -107,7 +107,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse = ClusterStateResponse clusterStateResponse =
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
} }
@ -122,15 +122,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
private void closeNodes(Helper helper) throws IOException { private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients"); logger.info("closing node");
for (AbstractClient client : helper.clients.values()) { if (helper.node != null) {
client.close(); helper.node.close();
}
logger.info("closing all nodes");
for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
} }
logger.info("all nodes closed"); logger.info("all nodes closed");
} }
@ -162,7 +156,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return helper; return helper;
} }
class Helper { static class Helper {
String home; String home;
@ -172,9 +166,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
int httpPort; int httpPort;
Map<String, Node> nodes = new HashMap<>(); Node node;
Map<String, AbstractClient> clients = new HashMap<>();
void setHome(String home) { void setHome(String home) {
this.home = home; this.home = home;
@ -196,7 +188,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return Settings.builder() return Settings.builder()
.put("cluster.name", getClusterName()) .put("cluster.name", getClusterName())
.put("path.home", getHome()) .put("path.home", getHome())
//.put("cluster.initial_master_nodes", "1") //.put("cluster.initial_master_nodes", )
//.put("discovery.seed_hosts", "127.0.0.1:9300") //.put("discovery.seed_hosts", "127.0.0.1:9300")
.build(); .build();
} }
@ -210,12 +202,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.build(); .build();
} }
void startNode(String id) throws NodeValidationException { void startNode() throws NodeValidationException {
buildNode(id).start(); buildNode().start();
} }
ElasticsearchClient client(String id) { ElasticsearchClient client() {
return clients.get(id); return node.client();
} }
String randomString(int len) { String randomString(int len) {
@ -227,17 +219,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return new String(buf); return new String(buf);
} }
private Node buildNode(String id) { private Node buildNode() {
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(getNodeSettings()) .put(getNodeSettings())
.put("node.name", id) .put("node.name", "1" )
.put("path.data", getHome() + "/data-" + id) .put("path.data", getHome() + "/data-1")
.build(); .build();
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class); List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins); this.node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
return node; return node;
} }
} }

View file

@ -1,4 +0,0 @@
/**
*
*/
package org.xbib.elx.http.test;

View file

@ -41,14 +41,14 @@ class BulkClientTest {
@Test @Test
void testSingleDoc() throws Exception { void testSingleDoc() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30))
.build()) { .build()) {
bulkClient.newIndex("test"); bulkClient.newIndex("test");
bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index("test", "doc1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush(); bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
@ -61,9 +61,9 @@ class BulkClientTest {
@Test @Test
void testNewIndex() throws Exception { void testNewIndex() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build()) { .build()) {
bulkClient.newIndex("test"); bulkClient.newIndex("test");
@ -72,13 +72,13 @@ class BulkClientTest {
@Test @Test
void testMapping() throws Exception { void testMapping() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build()) { .build()) {
XContentBuilder builder = JsonXContent.contentBuilder() XContentBuilder builder = JsonXContent.contentBuilder()
.startObject() .startObject()
@ -96,9 +96,9 @@ class BulkClientTest {
@Test @Test
void testRandomDocs() throws Exception { void testRandomDocs() throws Exception {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build()) { .build()) {
@ -122,12 +122,14 @@ class BulkClientTest {
@Test @Test
void testThreadedRandomDocs() throws Exception { void testThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors(); int maxthreads = Runtime.getRuntime().availableProcessors();
Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
final long actions = ACTIONS; final long actions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads) .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads)
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build()) { .build()) {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
@ -160,12 +162,12 @@ class BulkClientTest {
} }
bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS);
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
bulkClient.refreshIndex("test");
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test"));
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkController().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkController().getLastBulkError());
bulkClient.refreshIndex("test");
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test"));
} }
} }
} }

View file

@ -33,9 +33,9 @@ class DuplicateIDTest {
@Test @Test
void testDuplicateDocIDs() throws Exception { void testDuplicateDocIDs() throws Exception {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
bulkClient.newIndex("test"); bulkClient.newIndex("test");

View file

@ -19,6 +19,7 @@ import org.xbib.elx.node.NodeBulkClientProvider;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -38,13 +39,13 @@ class IndexPruneTest {
@Test @Test
void testPrune() throws IOException { void testPrune() throws IOException {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build()) { .build()) {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
@ -54,27 +55,33 @@ class IndexPruneTest {
IndexDefinition indexDefinition = new DefaultIndexDefinition(); IndexDefinition indexDefinition = new DefaultIndexDefinition();
indexDefinition.setIndex("test_prune"); indexDefinition.setIndex("test_prune");
indexDefinition.setFullIndexName("test_prune1"); indexDefinition.setFullIndexName("test_prune1");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune2", settings); bulkClient.newIndex("test_prune2", settings);
indexDefinition.setFullIndexName("test_prune2"); indexDefinition.setFullIndexName("test_prune2");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune3", settings); bulkClient.newIndex("test_prune3", settings);
indexDefinition.setFullIndexName("test_prune3"); indexDefinition.setFullIndexName("test_prune3");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune4", settings); bulkClient.newIndex("test_prune4", settings);
indexDefinition.setFullIndexName("test_prune4"); indexDefinition.setFullIndexName("test_prune4");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexRetention indexRetention = new DefaultIndexRetention(); IndexRetention indexRetention = new DefaultIndexRetention();
indexDefinition.setRetention(indexRetention); indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult);
assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1"));
assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2"));
assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3"));
assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4"));
List<Boolean> list = new ArrayList<>(); List<Boolean> list = new ArrayList<>();
for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) {
list.add(bulkClient.isIndexExists(index)); list.add(adminClient.isIndexExists(index));
} }
logger.info(list); logger.info(list);
assertFalse(list.get(0)); assertFalse(list.get(0));

View file

@ -37,13 +37,13 @@ class IndexShiftTest {
@Test @Test
void testIndexShift() throws Exception { void testIndexShift() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build()) { .build()) {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)

View file

@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -17,7 +16,6 @@ import org.xbib.elx.node.NodeBulkClientProvider;
import org.xbib.elx.node.NodeSearchClient; import org.xbib.elx.node.NodeSearchClient;
import org.xbib.elx.node.NodeSearchClientProvider; import org.xbib.elx.node.NodeSearchClientProvider;
import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -27,9 +25,9 @@ class SearchTest {
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
private static final Long ACTIONS = 100L; private static final Long ACTIONS = 100000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 10L; private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
@ -40,9 +38,9 @@ class SearchTest {
@Test @Test
void testDocStream() throws Exception { void testDocStream() throws Exception {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
bulkClient.newIndex("test"); bulkClient.newIndex("test");
@ -54,41 +52,33 @@ class SearchTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex("test"); bulkClient.refreshIndex("test");
assertEquals(numactions, bulkClient.getSearchableDocs("test")); assertEquals(numactions, bulkClient.getSearchableDocs("test"));
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}"); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex("test");
assertEquals(numactions + 1, bulkClient.getSearchableDocs("test"));
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkController().getLastBulkError());
} }
assertNull(bulkClient.getBulkController().getLastBulkError()); assertNull(bulkClient.getBulkController().getLastBulkError());
} }
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1")) try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client())
.setSearchClientProvider(NodeSearchClientProvider.class) .setSearchClientProvider(NodeSearchClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build()) { .build()) {
Optional<GetResponse> responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0"));
assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString());
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices("test") .setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()), .setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMinutes(1), 10); TimeValue.timeValueMillis(100), 579);
long count = stream.count(); long count = stream.count();
assertEquals(numactions + 1, count); assertEquals(numactions, count);
Stream<String> ids = searchClient.getIds(qb -> qb Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices("test") .setIndices("test")
.setQuery(QueryBuilders.matchAllQuery())); .setQuery(QueryBuilders.matchAllQuery()));
final AtomicInteger idcount = new AtomicInteger(); final AtomicInteger idcount = new AtomicInteger(0);
ids.forEach(id -> { ids.forEach(id -> idcount.incrementAndGet());
logger.info(id); assertEquals(numactions, idcount.get());
idcount.incrementAndGet(); assertEquals(275, searchClient.getSearchMetric().getQueries().getCount());
}); assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(numactions + 1, idcount.get());
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount());
} }
} }
} }

View file

@ -30,13 +30,13 @@ class SmokeTest {
@Test @Test
void smokeTest() throws Exception { void smokeTest() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings("1")) .put(helper.getNodeSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);

View file

@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
@ -83,15 +82,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Helper helper = extensionContext.getParent().get().getStore(ns) Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1"); helper.startNode();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName()); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName());
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); NodesInfoResponse response = helper.client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
TransportAddress address = response.getNodes().get(0).getNode().getAddress(); TransportAddress address = response.getNodes().get(0).getNode().getAddress();
helper.host = address.address().getHostName(); helper.host = address.address().getHostName();
helper.port = address.address().getPort(); helper.port = address.address().getPort();
logger.info("host = " + helper.host + " port = " + helper.port); logger.info("host = " + helper.host + " port = " + helper.port);
try { try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet(); .timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) { if (healthResponse != null && healthResponse.isTimedOut()) {
@ -103,7 +102,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse = ClusterStateResponse clusterStateResponse =
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster up, name = {}", clusterStateResponse.getClusterName().value()); logger.info("cluster up, name = {}", clusterStateResponse.getClusterName().value());
} }
@ -118,13 +117,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
private void closeNodes(Helper helper) throws IOException { private void closeNodes(Helper helper) throws IOException {
logger.info("closing all nodes"); logger.info("closing node");
for (Node node : helper.nodes.values()) { if (helper.node != null) {
if (node != null) { helper.node.close();
node.close();
}
} }
logger.info("all nodes closed"); logger.info("node closed");
} }
private static void deleteFiles(Path directory) throws IOException { private static void deleteFiles(Path directory) throws IOException {
@ -164,7 +161,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
int port; int port;
Map<String, Node> nodes = new HashMap<>(); Node node;
void setHome(String home) { void setHome(String home) {
this.home = home; this.home = home;
@ -182,7 +179,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return cluster; return cluster;
} }
Settings getNodeSettings(String id) { Settings getNodeSettings() {
return Settings.builder() return Settings.builder()
.put("cluster.name", getClusterName()) .put("cluster.name", getClusterName())
.put("path.home", getHome()) .put("path.home", getHome())
@ -190,12 +187,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.build(); .build();
} }
void startNode(String id) throws NodeValidationException { void startNode() throws NodeValidationException {
buildNode(id).start(); buildNode().start();
} }
ElasticsearchClient client(String id) { ElasticsearchClient client() {
return nodes.get(id).client(); return node.client();
} }
String randomString(int len) { String randomString(int len) {
@ -207,14 +204,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return new String(buf); return new String(buf);
} }
private Node buildNode(String id) { private Node buildNode() {
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(getNodeSettings(id)) .put(getNodeSettings())
.put("node.name", id) .put("node.name", "1")
.build(); .build();
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class); List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins); this.node = new MockNode(nodeSettings, plugins);
nodes.put(id, node);
return node; return node;
} }
} }

View file

@ -1,4 +0,0 @@
/**
*
*/
package org.xbib.elx.node.test;

View file

@ -19,6 +19,7 @@ import org.xbib.elx.transport.TransportBulkClientProvider;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -38,15 +39,14 @@ class IndexPruneTest {
@Test @Test
void testPrune() throws IOException { void testPrune() throws IOException {
final TransportAdminClient adminClient = ClientBuilder.builder() try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class) .setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.build(); .build();
final TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.build(); .build()) {
try {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
@ -55,36 +55,39 @@ class IndexPruneTest {
indexDefinition.setIndex("test_prune"); indexDefinition.setIndex("test_prune");
indexDefinition.setFullIndexName("test_prune1"); indexDefinition.setFullIndexName("test_prune1");
bulkClient.newIndex("test_prune1", settings); bulkClient.newIndex("test_prune1", settings);
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune2", settings); bulkClient.newIndex("test_prune2", settings);
indexDefinition.setFullIndexName("test_prune2"); indexDefinition.setFullIndexName("test_prune2");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune3", settings); bulkClient.newIndex("test_prune3", settings);
indexDefinition.setFullIndexName("test_prune3"); indexDefinition.setFullIndexName("test_prune3");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
bulkClient.newIndex("test_prune4", settings); bulkClient.newIndex("test_prune4", settings);
indexDefinition.setFullIndexName("test_prune4"); indexDefinition.setFullIndexName("test_prune4");
adminClient.shiftIndex(indexDefinition, List.of()); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList());
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexRetention indexRetention = new DefaultIndexRetention(); IndexRetention indexRetention = new DefaultIndexRetention();
indexDefinition.setRetention(indexRetention); indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult);
assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1"));
assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2"));
assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3"));
assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4"));
List<Boolean> list = new ArrayList<>(); List<Boolean> list = new ArrayList<>();
for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) {
list.add(bulkClient.isIndexExists(index)); list.add(adminClient.isIndexExists(index));
} }
logger.info(list); logger.info(list);
assertFalse(list.get(0)); assertFalse(list.get(0));
assertFalse(list.get(1)); assertFalse(list.get(1));
assertTrue(list.get(2)); assertTrue(list.get(2));
assertTrue(list.get(3)); assertTrue(list.get(3));
} finally {
adminClient.close();
bulkClient.close();
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkController().getLastBulkError());
} }

View file

@ -27,9 +27,9 @@ class SearchTest {
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
private static final Long ACTIONS = 100L; private static final Long ACTIONS = 100000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 10L; private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
private final TestExtension.Helper helper; private final TestExtension.Helper helper;
@ -55,13 +55,8 @@ class SearchTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex("test"); bulkClient.refreshIndex("test");
assertEquals(numactions, bulkClient.getSearchableDocs("test")); assertEquals(numactions, bulkClient.getSearchableDocs("test"));
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}");
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex("test");
assertEquals(numactions + 1, bulkClient.getSearchableDocs("test"));
} }
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) { if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError()); logger.error("error", bulkClient.getBulkController().getLastBulkError());
} }
@ -70,26 +65,23 @@ class SearchTest {
.setSearchClientProvider(TransportSearchClientProvider.class) .setSearchClientProvider(TransportSearchClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getTransportSettings())
.build()) { .build()) {
Optional<GetResponse> responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0"));
assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString());
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices("test") .setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()), .setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMinutes(1), 10); TimeValue.timeValueMillis(100), 579);
long count = stream.count(); long count = stream.count();
assertEquals(numactions + 1, count); assertEquals(numactions, count);
Stream<String> ids = searchClient.getIds(qb -> qb Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices("test") .setIndices("test")
.setQuery(QueryBuilders.matchAllQuery())); .setQuery(QueryBuilders.matchAllQuery()));
final AtomicInteger idcount = new AtomicInteger(); final AtomicInteger idcount = new AtomicInteger(0);
ids.forEach(id -> { ids.forEach(id -> idcount.incrementAndGet());
logger.info(id); assertEquals(numactions, idcount.get());
idcount.incrementAndGet(); assertEquals(275, searchClient.getSearchMetric().getQueries().getCount());
}); assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(numactions + 1, idcount.get());
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount());
} }
} }
} }

View file

@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
@ -37,9 +36,7 @@ import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor; import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -83,14 +80,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Helper helper = extensionContext.getParent().get().getStore(ns) Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1"); helper.startNode();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName()); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName());
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); NodesInfoResponse response = helper.client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
TransportAddress address = response.getNodes().get(0).getNode().getAddress(); TransportAddress address = response.getNodes().get(0).getNode().getAddress();
helper.host = address.address().getHostName(); helper.host = address.address().getHostName();
helper.port = address.address().getPort(); helper.port = address.address().getPort();
try { try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet(); .timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) { if (healthResponse != null && healthResponse.isTimedOut()) {
@ -102,7 +99,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse = ClusterStateResponse clusterStateResponse =
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
} }
@ -117,17 +114,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
private void closeNodes(Helper helper) throws IOException { private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients"); logger.info("closing node");
for (AbstractClient client : helper.clients.values()) { if (helper.node != null) {
client.close(); helper.node.close();
} }
logger.info("closing all nodes"); logger.info("node closed");
for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
}
logger.info("all nodes closed");
} }
private static void deleteFiles(Path directory) throws IOException { private static void deleteFiles(Path directory) throws IOException {
@ -157,7 +148,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return helper; return helper;
} }
class Helper { static class Helper {
String home; String home;
@ -167,9 +158,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
int port; int port;
Map<String, Node> nodes = new HashMap<>(); Node node;
Map<String, AbstractClient> clients = new HashMap<>();
void setHome(String home) { void setHome(String home) {
this.home = home; this.home = home;
@ -205,12 +194,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.build(); .build();
} }
void startNode(String id) throws NodeValidationException { void startNode() throws NodeValidationException {
buildNode(id).start(); buildNode().start();
} }
ElasticsearchClient client(String id) { ElasticsearchClient client() {
return clients.get(id); return node.client();
} }
String randomString(int len) { String randomString(int len) {
@ -222,16 +211,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return new String(buf); return new String(buf);
} }
private Node buildNode(String id) { private Node buildNode() {
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(getNodeSettings()) .put(getNodeSettings())
.put("node.name", id) .put("node.name", "1")
.build(); .build();
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class); List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins); this.node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
return node; return node;
} }
} }

View file

@ -6,9 +6,9 @@ gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0 xbib-metrics.version = 2.1.0
xbib-netty-http.version = 4.1.60.0 xbib-netty-http.version = 4.1.60.0
elasticsearch.version = 7.10.2 elasticsearch.version = 7.10.2
# ES 7.10.2.1 uses Jackson 2.10.4 # ES 7.10.2 uses Jackson 2.10.4
jackson.version = 2.12.1 jackson.version = 2.12.1
netty.version = 4.1.58.Final netty.version = 4.1.60.Final
tcnative.version = 2.0.36.Final tcnative.version = 2.0.36.Final
bouncycastle.version = 1.64 bouncycastle.version = 1.64
log4j.version = 2.14.0 log4j.version = 2.14.0