Merge branch 'es221' of alkmene.hbz-nrw.de:joerg/elx into es221
This commit is contained in:
commit
e88e352ca5
7 changed files with 46 additions and 65 deletions
|
@ -8,6 +8,12 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface BasicClient extends Closeable {
|
public interface BasicClient extends Closeable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initiative the client
|
||||||
|
* @param settings settings
|
||||||
|
*/
|
||||||
|
void init(Settings settings);
|
||||||
|
|
||||||
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);
|
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -23,7 +29,6 @@ public interface BasicClient extends Closeable {
|
||||||
*/
|
*/
|
||||||
ElasticsearchClient getClient();
|
ElasticsearchClient getClient();
|
||||||
|
|
||||||
void init(Settings settings);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get cluster name.
|
* Get cluster name.
|
||||||
|
@ -40,6 +45,9 @@ public interface BasicClient extends Closeable {
|
||||||
*/
|
*/
|
||||||
String getHealthColor(long maxWaitTime, TimeUnit timeUnit);
|
String getHealthColor(long maxWaitTime, TimeUnit timeUnit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for cluster being healthy.
|
||||||
|
*/
|
||||||
void waitForHealthyCluster();
|
void waitForHealthyCluster();
|
||||||
|
|
||||||
long getSearchableDocs(IndexDefinition indexDefinition);
|
long getSearchableDocs(IndexDefinition indexDefinition);
|
||||||
|
|
|
@ -77,9 +77,9 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Settings settings) {
|
public void init(Settings settings) {
|
||||||
|
this.settings = settings;
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (closed.compareAndSet(false, true)) {
|
||||||
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
|
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
|
||||||
this.settings = settings;
|
|
||||||
setClient(createClient(settings));
|
setClient(createClient(settings));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,23 +120,6 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Long getThreadPoolQueueSize(String name) {
|
|
||||||
ensureClientIsPresent();
|
|
||||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
|
|
||||||
nodesInfoRequest.threadPool(true);
|
|
||||||
NodesInfoResponse nodesInfoResponse =
|
|
||||||
client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
|
||||||
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
|
|
||||||
ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool();
|
|
||||||
for (ThreadPool.Info info : threadPoolInfo) {
|
|
||||||
if (info.getName().equals(name)) {
|
|
||||||
return info.getQueueSize().getSingles();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void waitForHealthyCluster() {
|
public void waitForHealthyCluster() {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -222,16 +205,16 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
|
|
||||||
protected abstract void closeClient(Settings settings) throws IOException;
|
protected abstract void closeClient(Settings settings) throws IOException;
|
||||||
|
|
||||||
protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) {
|
protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
if (index == null) {
|
if (index == null) {
|
||||||
throw new IllegalArgumentException("no index name given");
|
throw new IOException("no index name given");
|
||||||
}
|
}
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
throw new IllegalArgumentException("no key given");
|
throw new IOException("no key given");
|
||||||
}
|
}
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
throw new IllegalArgumentException("no value given");
|
throw new IOException("no value given");
|
||||||
}
|
}
|
||||||
Settings.Builder updateSettingsBuilder = Settings.builder();
|
Settings.Builder updateSettingsBuilder = Settings.builder();
|
||||||
updateSettingsBuilder.put(key, value.toString());
|
updateSettingsBuilder.put(key, value.toString());
|
||||||
|
@ -274,4 +257,21 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
throw new IllegalArgumentException("unknown time unit: " + timeUnit);
|
throw new IllegalArgumentException("unknown time unit: " + timeUnit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Long getThreadPoolQueueSize(String name) {
|
||||||
|
ensureClientIsPresent();
|
||||||
|
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
|
||||||
|
nodesInfoRequest.threadPool(true);
|
||||||
|
NodesInfoResponse nodesInfoResponse =
|
||||||
|
client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||||
|
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
|
||||||
|
ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool();
|
||||||
|
for (ThreadPool.Info info : threadPoolInfo) {
|
||||||
|
if (info.getName().equals(name)) {
|
||||||
|
return info.getQueueSize().getSingles();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,21 +2,12 @@ package org.xbib.elx.common.test;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|
||||||
import org.elasticsearch.client.ElasticsearchClient;
|
import org.elasticsearch.client.ElasticsearchClient;
|
||||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||||
|
@ -68,23 +59,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
||||||
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
||||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
|
||||||
helper.startNode();
|
helper.startNode();
|
||||||
try {
|
|
||||||
ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE,
|
|
||||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
|
||||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
|
||||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
|
||||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
|
||||||
+ ", from here on, everything will fail!");
|
|
||||||
}
|
|
||||||
} catch (ElasticsearchTimeoutException e) {
|
|
||||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
|
||||||
}
|
|
||||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
|
||||||
ClusterStateResponse clusterStateResponse =
|
|
||||||
helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
|
||||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -197,7 +172,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
||||||
.put(getNodeSettings())
|
.put(getNodeSettings())
|
||||||
.put("node.name", "1")
|
.put("node.name", "1")
|
||||||
.build();
|
.build();
|
||||||
return new MockNode(nodeSettings);
|
this.node = new MockNode(nodeSettings);
|
||||||
|
return node;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
||||||
.put("node.client", true)
|
.put("node.client", true)
|
||||||
.put("node.master", false)
|
.put("node.master", false)
|
||||||
.put("node.data", false)
|
.put("node.data", false)
|
||||||
|
.put("cluster.target_health", "YELLOW")
|
||||||
|
.put("cluster.target_health_timeout", "1m")
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient;
|
||||||
import org.elasticsearch.client.transport.TransportClient;
|
import org.elasticsearch.client.transport.TransportClient;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.xbib.elx.common.AbstractBulkClient;
|
import org.xbib.elx.common.AbstractBulkClient;
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transport search client with additional methods.
|
* Transport search client with additional methods.
|
||||||
|
|
|
@ -5,7 +5,6 @@ import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||||
import org.elasticsearch.client.ElasticsearchClient;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
|
@ -57,7 +56,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
||||||
Helper helper = extensionContext.getParent().isPresent() ?
|
Helper helper = extensionContext.getParent().isPresent() ?
|
||||||
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
|
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
|
||||||
Objects.requireNonNull(helper);
|
Objects.requireNonNull(helper);
|
||||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
|
||||||
helper.startNode();
|
helper.startNode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,16 +125,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
||||||
return cluster;
|
return cluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
ElasticsearchClient client() {
|
|
||||||
return node.client();
|
|
||||||
}
|
|
||||||
|
|
||||||
Settings getClientSettings() {
|
Settings getClientSettings() {
|
||||||
return Settings.builder()
|
return Settings.builder()
|
||||||
.put("cluster.name", cluster)
|
.put("cluster.name", cluster)
|
||||||
.put("path.home", getHome())
|
.put("path.home", getHome())
|
||||||
.put("host", host)
|
.put("host", host)
|
||||||
.put("port", port)
|
.put("port", port)
|
||||||
|
.put("cluster.target_health", "YELLOW")
|
||||||
|
.put("cluster.target_health_timeout", "1m")
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,22 +160,22 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
||||||
}
|
}
|
||||||
|
|
||||||
Node buildNode() {
|
Node buildNode() {
|
||||||
node = new MockNode(Settings.builder()
|
Settings nodeSettings = Settings.builder()
|
||||||
.put("name", getClusterName() + "-server-name") // for threadpool name
|
|
||||||
.put("cluster.name", getClusterName())
|
.put("cluster.name", getClusterName())
|
||||||
.put("path.home", getHome())
|
.put("path.home", getHome())
|
||||||
.put("node.name", getClusterName() + "-node")
|
.put("name", getClusterName() + "-name-server") // for threadpool setting
|
||||||
.put("node.client", false)
|
.put("node.name", getClusterName() + "-server")
|
||||||
.put("node.master", true)
|
.put("node.master", "true")
|
||||||
.put("node.data", true)
|
.put("node.data", "true")
|
||||||
.build());
|
.put("node.client", "false")
|
||||||
|
.build();
|
||||||
|
this.node = new MockNode(nodeSettings);
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
void closeNodes() {
|
void closeNodes() {
|
||||||
if (node != null) {
|
if (node != null) {
|
||||||
logger.info("closing node");
|
logger.info("closing node");
|
||||||
node.client().close();
|
|
||||||
node.close();
|
node.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
group = org.xbib
|
group = org.xbib
|
||||||
name = elx
|
name = elx
|
||||||
version = 2.2.1.44
|
version = 2.2.1.45
|
||||||
|
|
||||||
gradle.wrapper.version = 6.6.1
|
gradle.wrapper.version = 6.6.1
|
||||||
xbib-metrics.version = 2.1.0
|
xbib-metrics.version = 2.1.0
|
||||||
|
|
Loading…
Reference in a new issue