cluster target health management, fix node test settings

2.2.1.45
Jörg Prante 3 years ago
parent f9f62fc56c
commit b6479b216a

@ -3,13 +3,18 @@ package org.xbib.elx.api;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BasicClient extends Closeable { public interface BasicClient extends Closeable {
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; /**
* Initiative the client
* @param settings settings
*/
void init(Settings settings);
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);
/** /**
* Set an Elasticsearch client to extend from it. May be null for TransportClient. * Set an Elasticsearch client to extend from it. May be null for TransportClient.
@ -24,14 +29,6 @@ public interface BasicClient extends Closeable {
*/ */
ElasticsearchClient getClient(); ElasticsearchClient getClient();
/**
* Initiative the extended client, the bulk metric and bulk controller,
* creates instances and connect to cluster, if required.
*
* @param settings settings
* @throws IOException if init fails
*/
void init(Settings settings) throws IOException;
/** /**
* Get cluster name. * Get cluster name.
@ -50,14 +47,8 @@ public interface BasicClient extends Closeable {
/** /**
* Wait for cluster being healthy. * Wait for cluster being healthy.
*
* @param healthColor cluster health color to wait for
* @param maxWaitTime time value
* @param timeUnit time unit
*/ */
void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); void waitForHealthyCluster();
void waitForShards(long maxWaitTime, TimeUnit timeUnit);
long getSearchableDocs(IndexDefinition indexDefinition); long getSearchableDocs(IndexDefinition indexDefinition);

@ -112,7 +112,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
} }
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
return this; return this;
} }
@ -129,6 +129,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
long maxWaitTime = indexDefinition.getMaxWaitTime(); long maxWaitTime = indexDefinition.getMaxWaitTime();
TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit(); TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit();
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
waitForHealthyCluster();
return this; return this;
} }
@ -409,7 +410,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
if (forceMergeResponse.getFailedShards() > 0) { if (forceMergeResponse.getFailedShards() > 0) {
throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards());
} }
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
return true; return true;
} }
@ -421,7 +422,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
.settings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); .settings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
} }
@Override @Override

@ -76,10 +76,10 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
this.settings = settings;
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
this.settings = settings;
setClient(createClient(settings)); setClient(createClient(settings));
} }
} }
@ -105,13 +105,13 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();
if (key == null) { if (key == null) {
throw new IOException("no key given"); throw new IllegalArgumentException("no key given");
} }
if (value == null) { if (value == null) {
throw new IOException("no value given"); throw new IllegalArgumentException("no value given");
} }
Settings.Builder updateSettingsBuilder = Settings.builder(); Settings.Builder updateSettingsBuilder = Settings.builder();
updateSettingsBuilder.put(key, value.toString()); updateSettingsBuilder.put(key, value.toString());
@ -120,26 +120,16 @@ public abstract class AbstractBasicClient implements BasicClient {
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
} }
protected Long getThreadPoolQueueSize(String name) {
ensureClientIsPresent();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.threadPool(true);
NodesInfoResponse nodesInfoResponse =
client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool();
for (ThreadPool.Info info : threadPoolInfo) {
if (info.getName().equals(name)) {
return info.getQueueSize().getSingles();
}
}
}
return null;
}
@Override @Override
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { public void waitForHealthyCluster() {
ensureClientIsPresent(); ensureClientIsPresent();
String statusString = settings.get(Parameters.CLUSTER_TARGET_HEALTH.getName(),
Parameters.CLUSTER_TARGET_HEALTH.getString());
String waitTimeStr = settings.get(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(),
Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getString());
TimeValue timeValue = TimeValue.parseTimeValue(waitTimeStr, TimeValue.timeValueMinutes(30L), "");
long maxWaitTime = timeValue.minutes();
TimeUnit timeUnit = TimeUnit.MINUTES;
logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit); logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit);
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
@ -155,23 +145,6 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
} }
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent();
logger.info("waiting for cluster shard settling");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.waitForRelocatingShards(0)
.timeout(timeout);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) {
String message = "timeout waiting for cluster shards: " + timeout;
logger.error(message);
throw new IllegalStateException(message);
}
}
@Override @Override
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();
@ -228,7 +201,7 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
} }
protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; protected abstract ElasticsearchClient createClient(Settings settings);
protected abstract void closeClient(Settings settings) throws IOException; protected abstract void closeClient(Settings settings) throws IOException;
@ -284,4 +257,21 @@ public abstract class AbstractBasicClient implements BasicClient {
throw new IllegalArgumentException("unknown time unit: " + timeUnit); throw new IllegalArgumentException("unknown time unit: " + timeUnit);
} }
} }
protected Long getThreadPoolQueueSize(String name) {
ensureClientIsPresent();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.threadPool(true);
NodesInfoResponse nodesInfoResponse =
client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool();
for (ThreadPool.Info info : threadPoolInfo) {
if (info.getName().equals(name)) {
return info.getQueueSize().getSingles();
}
}
}
return null;
}
} }

@ -40,7 +40,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
if (closed.compareAndSet(true, false)) { if (closed.compareAndSet(true, false)) {
super.init(settings); super.init(settings);
bulkProcessor = new DefaultBulkProcessor(this, settings); bulkProcessor = new DefaultBulkProcessor(this, settings);
@ -114,7 +114,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
logger.warn("index creation of {} not acknowledged", index); logger.warn("index creation of {} not acknowledged", index);
return; return;
} }
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
} }
@Override @Override

@ -49,7 +49,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
if (closed.compareAndSet(true, false)) { if (closed.compareAndSet(true, false)) {
super.init(settings); super.init(settings);
this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); this.searchMetric = new DefaultSearchMetric(getScheduler(), settings);

@ -3,8 +3,6 @@ package org.xbib.elx.common;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.TimeUnit;
/** /**
* A mocked client, it does not perform any actions on a cluster. Useful for testing. * A mocked client, it does not perform any actions on a cluster. Useful for testing.
*/ */
@ -28,14 +26,6 @@ public class MockAdminClient extends AbstractAdminClient {
protected void closeClient(Settings settings) { protected void closeClient(Settings settings) {
} }
@Override
public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) {
}
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override @Override
public void close() { public void close() {
// nothing to do // nothing to do

@ -26,10 +26,6 @@ public class MockBulkClient extends AbstractBulkClient {
return null; return null;
} }
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override @Override
protected ElasticsearchClient createClient(Settings settings) { protected ElasticsearchClient createClient(Settings settings) {
return null; return null;

@ -24,10 +24,6 @@ public class MockSearchClient extends AbstractSearchClient {
return null; return null;
} }
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override @Override
protected ElasticsearchClient createClient(Settings settings) { protected ElasticsearchClient createClient(Settings settings) {
return null; return null;

@ -2,6 +2,10 @@ package org.xbib.elx.common;
public enum Parameters { public enum Parameters {
CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"),
CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"),
DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"),
BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"),

@ -2,21 +2,12 @@ package org.xbib.elx.common.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback;
@ -68,23 +59,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
public void beforeEach(ExtensionContext extensionContext) throws Exception { public void beforeEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns) Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode(); helper.startNode();
try {
ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
}
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
} }
@Override @Override
@ -197,7 +172,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put(getNodeSettings()) .put(getNodeSettings())
.put("node.name", "1") .put("node.name", "1")
.build(); .build();
return new MockNode(nodeSettings); this.node = new MockNode(nodeSettings);
return node;
} }
} }
} }

@ -35,7 +35,7 @@ class BulkClientTest {
void testNewIndex() throws Exception { void testNewIndex() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -46,7 +46,7 @@ class BulkClientTest {
void testSingleDoc() throws Exception { void testSingleDoc() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -65,7 +65,7 @@ class BulkClientTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -93,7 +93,7 @@ class BulkClientTest {
long timeout = 120L; long timeout = 120L;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);

@ -37,7 +37,7 @@ class DuplicateIDTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");

@ -40,11 +40,11 @@ class IndexPruneTest {
void testPrune() throws IOException { void testPrune() throws IOException {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setIndex("test_prune"); indexDefinition.setIndex("test_prune");

@ -38,11 +38,11 @@ class IndexShiftTest {
void testIndexShift() throws Exception { void testIndexShift() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_shift"); indexDefinition.setFullIndexName("test_shift");

@ -40,7 +40,7 @@ class SearchTest {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition); bulkClient.startBulk(indexDefinition);
@ -60,7 +60,7 @@ class SearchTest {
} }
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client()) try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client())
.setSearchClientProvider(NodeSearchClientProvider.class) .setSearchClientProvider(NodeSearchClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
// test stream count // test stream count
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb

@ -36,11 +36,11 @@ class SmokeTest {
void smokeTest() throws Exception { void smokeTest() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
@ -51,22 +51,20 @@ class SmokeTest {
indexDefinition.setType("doc"); indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush(); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.checkMapping(indexDefinition); adminClient.checkMapping(indexDefinition);
bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.waitForResponses(30, TimeUnit.SECONDS);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.deleteIndex(indexDefinition); adminClient.deleteIndex(indexDefinition);
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition, 2); adminClient.updateReplicaLevel(indexDefinition, 1);
int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {

@ -102,7 +102,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private Helper create() { private Helper create() {
Helper helper = new Helper(); Helper helper = new Helper();
String home = System.getProperty("path.home", "build/elxnode"); String home = System.getProperty("path.home", "build/elxnode/");
helper.setHome(home + "/" + helper.randomString(8)); helper.setHome(home + "/" + helper.randomString(8));
helper.setClusterName("test-cluster-" + helper.randomString(8)); helper.setClusterName("test-cluster-" + helper.randomString(8));
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
@ -137,11 +137,17 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return cluster; return cluster;
} }
Settings getNodeSettings() { Settings getClientSettings() {
return Settings.builder() return Settings.builder()
.put("name", "elx-client") // for threadpool name
.put("cluster.name", getClusterName()) .put("cluster.name", getClusterName())
.put("path.home", getHome()) .put("path.home", getHome())
.put("name", getClusterName() + "-name-client") // for threadpool setting
.put("node.name", getClusterName() + "-client")
.put("node.master", "false")
.put("node.data", "false")
.put("node.client", "true")
.put("cluster.target_health", "YELLOW")
.put("cluster.target_health_timeout", "1m")
.build(); .build();
} }
@ -168,10 +174,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
Node buildNode() { Node buildNode() {
String id = "1";
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(getNodeSettings()) .put("cluster.name", getClusterName())
.put("node.name", id) .put("path.home", getHome())
.put("name", getClusterName() + "-name-server") // for threadpool setting
.put("node.name", getClusterName() + "-server")
.put("node.master", "true")
.put("node.data", "true")
.put("node.client", "false")
.build(); .build();
this.node = new MockNode(nodeSettings); this.node = new MockNode(nodeSettings);
return node; return node;

@ -5,8 +5,6 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractAdminClient; import org.xbib.elx.common.AbstractAdminClient;
import java.io.IOException;
/** /**
* Transport admin client. * Transport admin client.
*/ */
@ -20,12 +18,12 @@ public class TransportAdminClient extends AbstractAdminClient {
} }
@Override @Override
public ElasticsearchClient createClient(Settings settings) throws IOException { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings); return helper.createClient(settings);
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
super.init(settings); super.init(settings);
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
} }

@ -4,7 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractBulkClient; import org.xbib.elx.common.AbstractBulkClient;
import java.io.IOException;
/** /**
* Transport search client with additional methods. * Transport search client with additional methods.
@ -19,12 +18,12 @@ public class TransportBulkClient extends AbstractBulkClient {
} }
@Override @Override
public ElasticsearchClient createClient(Settings settings) throws IOException { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings); return helper.createClient(settings);
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
super.init(settings); super.init(settings);
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
} }

@ -46,7 +46,7 @@ public class TransportClientHelper {
} }
} }
public void init(TransportClient transportClient, Settings settings) throws IOException { public void init(TransportClient transportClient, Settings settings) {
Collection<TransportAddress> addrs = findAddresses(settings); Collection<TransportAddress> addrs = findAddresses(settings);
if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) { if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) {
throw new NoNodeAvailableException("no cluster nodes available, check settings = " throw new NoNodeAvailableException("no cluster nodes available, check settings = "
@ -54,7 +54,7 @@ public class TransportClientHelper {
} }
} }
private Collection<TransportAddress> findAddresses(Settings settings) throws IOException { private Collection<TransportAddress> findAddresses(Settings settings) {
final int defaultPort = settings.getAsInt("port", 9300); final int defaultPort = settings.getAsInt("port", 9300);
Collection<TransportAddress> addresses = new ArrayList<>(); Collection<TransportAddress> addresses = new ArrayList<>();
for (String hostname : settings.getAsArray("host")) { for (String hostname : settings.getAsArray("host")) {
@ -66,16 +66,20 @@ public class TransportClientHelper {
int port = Integer.parseInt(splitHost[1]); int port = Integer.parseInt(splitHost[1]);
TransportAddress address = new InetSocketTransportAddress(inetAddress, port); TransportAddress address = new InetSocketTransportAddress(inetAddress, port);
addresses.add(address); addresses.add(address);
} catch (NumberFormatException e) { } catch (IOException e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
} }
} else if (splitHost.length == 1) { } else if (splitHost.length == 1) {
String host = splitHost[0]; try {
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); String host = splitHost[0];
TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
addresses.add(address); TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort);
addresses.add(address);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
} else { } else {
throw new IOException("invalid hostname specification: " + hostname); throw new IllegalArgumentException("invalid hostname specification: " + hostname);
} }
} }
return addresses; return addresses;

@ -4,7 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractSearchClient; import org.xbib.elx.common.AbstractSearchClient;
import java.io.IOException;
/** /**
* Transport search client with additional methods. * Transport search client with additional methods.
@ -19,12 +18,12 @@ public class TransportSearchClient extends AbstractSearchClient {
} }
@Override @Override
public ElasticsearchClient createClient(Settings settings) throws IOException { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings); return helper.createClient(settings);
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
super.init(settings); super.init(settings);
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
} }

@ -35,7 +35,7 @@ class BulkClientTest {
void testNewIndex() throws Exception { void testNewIndex() throws Exception {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -46,7 +46,7 @@ class BulkClientTest {
void testSingleDoc() throws Exception { void testSingleDoc() throws Exception {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -65,7 +65,7 @@ class BulkClientTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -91,7 +91,7 @@ class BulkClientTest {
final long timeout = 120L; final long timeout = 120L;
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test"); indexDefinition.setFullIndexName("test");

@ -32,7 +32,7 @@ class DuplicateIDTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);

@ -40,11 +40,11 @@ class IndexPruneTest {
void testPrune() throws IOException { void testPrune() throws IOException {
try (TransportAdminClient adminClient = ClientBuilder.builder() try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class) .setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setIndex("test_prune"); indexDefinition.setIndex("test_prune");

@ -40,11 +40,11 @@ class IndexShiftTest {
void testIndexShift() throws Exception { void testIndexShift() throws Exception {
try (TransportAdminClient adminClient = ClientBuilder.builder() try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class) .setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_shift"); indexDefinition.setFullIndexName("test_shift");

@ -42,7 +42,7 @@ class SearchTest {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition); bulkClient.startBulk(indexDefinition);
@ -62,7 +62,7 @@ class SearchTest {
} }
try (TransportSearchClient searchClient = ClientBuilder.builder() try (TransportSearchClient searchClient = ClientBuilder.builder()
.setSearchClientProvider(TransportSearchClientProvider.class) .setSearchClientProvider(TransportSearchClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
// test stream count // test stream count
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb

@ -36,11 +36,11 @@ class SmokeTest {
void smokeTest() throws Exception { void smokeTest() throws Exception {
try (TransportAdminClient adminClient = ClientBuilder.builder() try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class) .setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY);
@ -50,21 +50,20 @@ class SmokeTest {
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.checkMapping(indexDefinition); adminClient.checkMapping(indexDefinition);
bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete(indexDefinition, "1"); bulkClient.delete(indexDefinition, "1");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.deleteIndex(indexDefinition); adminClient.deleteIndex(indexDefinition);
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition, 2); adminClient.updateReplicaLevel(indexDefinition, 1);
int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {

@ -2,21 +2,11 @@ package org.xbib.elx.transport.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback;
@ -66,10 +56,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Helper helper = extensionContext.getParent().isPresent() ? Helper helper = extensionContext.getParent().isPresent() ?
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
Objects.requireNonNull(helper); Objects.requireNonNull(helper);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode(); helper.startNode();
helper.greenHealth();
logger.info("cluster name = {}", helper.clusterName());
} }
@Override @Override
@ -122,8 +109,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Node node; Node node;
AbstractClient client;
void setHome(String home) { void setHome(String home) {
this.home = home; this.home = home;
} }
@ -140,26 +125,21 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return cluster; return cluster;
} }
Settings getNodeSettings() { Settings getClientSettings() {
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.build();
}
Settings getTransportSettings() {
return Settings.builder() return Settings.builder()
.put("cluster.name", cluster) .put("cluster.name", cluster)
.put("path.home", getHome()) .put("path.home", getHome())
.put("host", host) .put("host", host)
.put("port", port) .put("port", port)
.put("cluster.target_health", "YELLOW")
.put("cluster.target_health_timeout", "1m")
.build(); .build();
} }
void startNode() { void startNode() {
buildNode().start(); buildNode().start();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); NodesInfoResponse response = node.client().execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress() Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress(); .publishAddress();
if (obj instanceof InetSocketTransportAddress) { if (obj instanceof InetSocketTransportAddress) {
@ -180,48 +160,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
Node buildNode() { Node buildNode() {
String id = "1";
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(getNodeSettings()) .put("cluster.name", getClusterName())
.put("node.name", id) .put("path.home", getHome())
.put("name", getClusterName() + "-name-server") // for threadpool setting
.put("node.name", getClusterName() + "-server")
.put("node.master", "true")
.put("node.data", "true")
.put("node.client", "false")
.build(); .build();
node = new MockNode(nodeSettings); this.node = new MockNode(nodeSettings);
client = (AbstractClient) node.client();
return node; return node;
} }
void closeNodes() { void closeNodes() {
if (client != null) {
logger.info("closing client");
client.close();
}
if (node != null) { if (node != null) {
logger.info("closing node"); logger.info("closing node");
node.close(); node.close();
} }
} }
void greenHealth() throws IOException {
try {
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
}
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
}
String clusterName() {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value();
}
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final Random random = new SecureRandom(); private static final Random random = new SecureRandom();

@ -1,6 +1,6 @@
group = org.xbib group = org.xbib
name = elx name = elx
version = 2.2.1.44 version = 2.2.1.45
gradle.wrapper.version = 6.6.1 gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0 xbib-metrics.version = 2.1.0

Loading…
Cancel
Save