parent
2b3ac518b5
commit
d4e40145be
@ -0,0 +1,25 @@
|
||||
package org.xbib.elx.api;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.MultiGetRequest;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
||||
public interface ReadClient {
|
||||
|
||||
ActionFuture<GetResponse> get(GetRequest getRequest);
|
||||
|
||||
void get(GetRequest request, ActionListener<GetResponse> listener);
|
||||
|
||||
ActionFuture<MultiGetResponse> multiGet(MultiGetRequest request);
|
||||
|
||||
void multiGet(MultiGetRequest request, ActionListener<MultiGetResponse> listener);
|
||||
|
||||
ActionFuture<SearchResponse> search(SearchRequest request);
|
||||
|
||||
void search(SearchRequest request, ActionListener<SearchResponse> listener);
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
package org.xbib.elx.api;
|
||||
|
||||
public interface ReadClientProvider<C extends ReadClient> {
|
||||
|
||||
C getReadClient();
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
package org.xbib.elx.common.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Ignore
|
||||
public class ClusterBlockTest extends TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
@Before
|
||||
public void startNodes() {
|
||||
try {
|
||||
setClusterName("test-cluster-" + System.getProperty("user.name"));
|
||||
startNode("1");
|
||||
// do not wait for green health state
|
||||
logger.info("ready");
|
||||
} catch (Throwable t) {
|
||||
logger.error("startNodes failed", t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put(super.getNodeSettings())
|
||||
.put("discovery.zen.minimum_master_nodes", 2) // block until we have two nodes
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test(expected = ClusterBlockException.class)
|
||||
public void testClusterBlock() throws Exception {
|
||||
Client client = client("1");
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("field1", "value1").endObject();
|
||||
IndexRequestBuilder irb = client.prepareIndex("test", "test", "1").setSource(builder);
|
||||
BulkRequestBuilder brb = client.prepareBulk();
|
||||
brb.add(irb);
|
||||
brb.execute().actionGet();
|
||||
}
|
||||
}
|
@ -1,207 +0,0 @@
|
||||
package org.xbib.elx.common.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
public class TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
private Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
private String cluster;
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
@Before
|
||||
public void startNodes() {
|
||||
try {
|
||||
logger.info("starting");
|
||||
setClusterName("test-cluster-" + System.getProperty("user.name"));
|
||||
startNode("1");
|
||||
findNodeAddress();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||
clusterStateRequest.all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
} catch (Throwable t) {
|
||||
logger.error(t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopNodes() {
|
||||
try {
|
||||
closeNodes();
|
||||
} catch (Exception e) {
|
||||
logger.error("can not close nodes", e);
|
||||
} finally {
|
||||
try {
|
||||
deleteFiles();
|
||||
logger.info("data files wiped");
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Settings getTransportSettings() {
|
||||
return Settings.builder()
|
||||
.put("host", host)
|
||||
.put("port", port)
|
||||
.put("cluster.name", cluster)
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME)
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected static String getHome() {
|
||||
return System.getProperty("path.home", System.getProperty("user.dir"));
|
||||
}
|
||||
|
||||
protected void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
protected AbstractClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
protected void setClusterName(String cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
protected String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
protected String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private void closeNodes() throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
clients.clear();
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
nodes.clear();
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private void findNodeAddress() {
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
|
||||
TransportAddress address= response.getNodes().iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
logger.info("clients={}", clients);
|
||||
return node;
|
||||
}
|
||||
|
||||
private static void deleteFiles() throws IOException {
|
||||
Path directory = Paths.get(getHome() + "/data");
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,224 @@
|
||||
package org.xbib.elx.common.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.jupiter.api.extension.ParameterContext;
|
||||
import org.junit.jupiter.api.extension.ParameterResolutionException;
|
||||
import org.junit.jupiter.api.extension.ParameterResolver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Junit 5 extension for testing Elasticsearch.
|
||||
* The extension will be instantiated as a singleton.
|
||||
* For parallel test method executions, for example in gradle, it requires a helper class
|
||||
* to ensure different ES homes/clusters for each run.
|
||||
*/
|
||||
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final String key = "es-instance-";
|
||||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
private static final ExtensionContext.Namespace ns =
|
||||
ExtensionContext.Namespace.create(TestExtension.class);
|
||||
|
||||
@Override
|
||||
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
return parameterContext.getParameter().getType().equals(Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
// initialize new helper here, increase counter
|
||||
return extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
||||
helper.startNode("1");
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
TransportAddress address = response.getNodes().get(0).getTransport().getAddress().publishAddress();
|
||||
String host = address.address().getHostName();
|
||||
int port = address.address().getPort();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
closeNodes(helper);
|
||||
deleteFiles(Paths.get(helper.getHome()));
|
||||
logger.info("data files wiped: " + helper.getHome());
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private void closeNodes(Helper helper) throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : helper.clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : helper.nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private static void deleteFiles(Path directory) throws IOException {
|
||||
if (Files.exists(directory)) {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Helper create() {
|
||||
Helper helper = new Helper();
|
||||
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
|
||||
helper.setClusterName("test-cluster-" + helper.randomString(8));
|
||||
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
|
||||
return helper;
|
||||
}
|
||||
|
||||
class Helper {
|
||||
|
||||
String home;
|
||||
|
||||
String cluster;
|
||||
|
||||
Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
void setHome(String home) {
|
||||
this.home = home;
|
||||
}
|
||||
|
||||
String getHome() {
|
||||
return home;
|
||||
}
|
||||
|
||||
void setClusterName(String cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", getClusterName())
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
ElasticsearchClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
return node;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
package org.xbib.elx.http.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.api.IndexPruneResult;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.http.ExtendedHttpClient;
|
||||
import org.xbib.elx.http.ExtendedHttpClientProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Disabled
|
||||
@ExtendWith(TestExtension.class)
|
||||
class IndexPruneTest {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getName());
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
IndexPruneTest(TestExtension.Helper helper) {
|
||||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPrune() throws IOException {
|
||||
final ExtendedHttpClient client = ClientBuilder.builder(helper.client("1"))
|
||||
.put(helper.getHttpSettings())
|
||||
.provider(ExtendedHttpClientProvider.class)
|
||||
.build();
|
||||
try {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
client.newIndex("test1", settings);
|
||||
client.shiftIndex("test", "test1", Collections.emptyList());
|
||||
client.newIndex("test2", settings);
|
||||
client.shiftIndex("test", "test2", Collections.emptyList());
|
||||
client.newIndex("test3", settings);
|
||||
client.shiftIndex("test", "test3", Collections.emptyList());
|
||||
client.newIndex("test4", settings);
|
||||
client.shiftIndex("test", "test4", Collections.emptyList());
|
||||
IndexPruneResult indexPruneResult =
|
||||
client.pruneIndex("test", "test4", 2, 2, true);
|
||||
assertTrue(indexPruneResult.getDeletedIndices().contains("test1"));
|
||||
assertTrue(indexPruneResult.getDeletedIndices().contains("test2"));
|
||||
assertFalse(indexPruneResult.getDeletedIndices().contains("test3"));
|
||||
assertFalse(indexPruneResult.getDeletedIndices().contains("test4"));
|
||||
List<Boolean> list = new ArrayList<>();
|
||||
for (String index : Arrays.asList("test1", "test2", "test3", "test4")) {
|
||||
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
|
||||
indicesExistsRequest.indices(index);
|
||||
IndicesExistsResponse indicesExistsResponse =
|
||||
client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
|
||||
list.add(indicesExistsResponse.isExists());
|
||||
}
|
||||
logger.info(list);
|
||||
assertFalse(list.get(0));
|
||||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
package org.xbib.elx.http.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexingStats;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.http.ExtendedHttpClient;
|
||||
import org.xbib.elx.http.ExtendedHttpClientProvider;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
@Ignore
|
||||
public class ReplicaTest extends TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getSimpleName());
|
||||
|
||||
@Test
|
||||
public void testReplicaLevel() throws Exception {
|
||||
|
||||
// we need nodes for replica levels
|
||||
startNode("2");
|
||||
startNode("3");
|
||||
startNode("4");
|
||||
|
||||
Settings settingsTest1 = Settings.builder()
|
||||
.put("index.number_of_shards", 2)
|
||||
.put("index.number_of_replicas", 3)
|
||||
.build();
|
||||
|
||||
Settings settingsTest2 = Settings.builder()
|
||||
.put("index.number_of_shards", 2)
|
||||
.put("index.number_of_replicas", 1)
|
||||
.build();
|
||||
|
||||
final ExtendedHttpClient client = ClientBuilder.builder()
|
||||
.provider(ExtendedHttpClientProvider.class)
|
||||
.build();
|
||||
|
||||
try {
|
||||
client.newIndex("test1", settingsTest1, new HashMap<>())
|
||||
.newIndex("test2", settingsTest2, new HashMap<>());
|
||||
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
|
||||
for (int i = 0; i < 1234; i++) {
|
||||
client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
for (int i = 0; i < 1234; i++) {
|
||||
client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
logger.info("refreshing");
|
||||
client.refreshIndex("test1");
|
||||
client.refreshIndex("test2");
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
|
||||
.setIndices("test1", "test2")
|
||||
.setQuery(matchAllQuery());
|
||||
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
|
||||
logger.info("query total hits={}", hits);
|
||||
assertEquals(2468, hits);
|
||||
IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.getClient(), IndicesStatsAction.INSTANCE)
|
||||
.all();
|
||||
IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet();
|
||||
for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) {
|
||||
IndexStats indexStats = m.getValue();
|
||||
CommonStats commonStats = indexStats.getTotal();
|
||||
IndexingStats indexingStats = commonStats.getIndexing();
|
||||
IndexingStats.Stats stats = indexingStats.getTotal();
|
||||
logger.info("index {}: count = {}", m.getKey(), stats.getIndexCount());
|
||||
for (Map.Entry<Integer, IndexShardStats> me : indexStats.getIndexShards().entrySet()) {
|
||||
IndexShardStats indexShardStats = me.getValue();
|
||||
CommonStats commonShardStats = indexShardStats.getTotal();
|
||||
logger.info("shard {} count = {}", me.getKey(),
|
||||
commonShardStats.getIndexing().getTotal().getIndexCount());
|
||||
}
|
||||
}
|
||||
try {
|
||||
client.deleteIndex("test1")
|
||||
.deleteIndex("test2");
|
||||
} catch (Exception e) {
|
||||
logger.error("delete index failed, ignored. Reason:", e);
|
||||
}
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateReplicaLevel() throws Exception {
|
||||
|
||||
long numberOfShards = 2;
|
||||
int replicaLevel = 3;
|
||||
|
||||
// we need 3 nodes for replica level 3
|
||||
startNode("2");
|
||||
startNode("3");
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", numberOfShards)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
|
||||
final ExtendedHttpClient client = ClientBuilder.builder()
|
||||
.provider(ExtendedHttpClientProvider.class)
|
||||
.build();
|
||||
|
||||
try {
|
||||
client.newIndex("replicatest", settings, new HashMap<>());
|
||||
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
|
||||
for (int i = 0; i < 12345; i++) {
|
||||
client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
client.updateReplicaLevel("replicatest", replicaLevel, 30L, TimeUnit.SECONDS);
|
||||
assertEquals(replicaLevel, client.getReplicaLevel("replicatest"));
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,198 +0,0 @@
|
||||
package org.xbib.elx.http.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
private Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
private AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
private String cluster;
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
@Before
|
||||
public void startNodes() {
|
||||
try {
|
||||
logger.info("starting");
|
||||
this.cluster = "test-helper-cluster-" + counter.incrementAndGet();
|
||||
startNode("1");
|
||||
findNodeAddress();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.error("startNodes failed", t);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopNodes() {
|
||||
try {
|
||||
closeNodes();
|
||||
} catch (Exception e) {
|
||||
logger.error("can not close nodes", e);
|
||||
} finally {
|
||||
try {
|
||||
deleteFiles();
|
||||
logger.info("data files wiped");
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Settings getTransportSettings() {
|
||||
return Settings.builder()
|
||||
.put("host", host)
|
||||
.put("port", port)
|
||||
.put("cluster.name", cluster)
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("discovery.zen.minimum_master_nodes", "1")
|
||||
.put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME)
|
||||
.put("node.max_local_storage_nodes", 10) // allow many nodes to initialize here
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected static String getHome() {
|
||||
return System.getProperty("path.home", System.getProperty("user.dir"));
|
||||
}
|
||||
|
||||
protected void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
protected AbstractClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
protected String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
protected String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private void closeNodes() throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
clients.clear();
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
nodes.clear();
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private void findNodeAddress() {
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
|
||||
TransportAddress address= response.getNodes().iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
return node;
|
||||
}
|
||||
|
||||
private static void deleteFiles() throws IOException {
|
||||
Path directory = Paths.get(getHome() + "/data");
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,236 @@
|
||||
package org.xbib.elx.http.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.jupiter.api.extension.ParameterContext;
|
||||
import org.junit.jupiter.api.extension.ParameterResolutionException;
|
||||
import org.junit.jupiter.api.extension.ParameterResolver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Junit 5 extension for testing Elasticsearch.
|
||||
* The extension will be instantiated as a singleton.
|
||||
* For parallel test method executions, for example in gradle, it requires a helper class
|
||||
* to ensure different ES homes/clusters for each run.
|
||||
*/
|
||||
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final String key = "es-instance-";
|
||||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
private static final ExtensionContext.Namespace ns =
|
||||
ExtensionContext.Namespace.create(TestExtension.class);
|
||||
|
||||
@Override
|
||||
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
return parameterContext.getParameter().getType().equals(Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
// initialize new helper here, increase counter
|
||||
return extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
||||
helper.startNode("1");
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
TransportAddress address = response.getNodes().get(0).getHttp().getAddress().publishAddress();
|
||||
helper.httpHost = address.address().getHostName();
|
||||
helper.httpPort = address.address().getPort();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
closeNodes(helper);
|
||||
deleteFiles(Paths.get(helper.getHome()));
|
||||
logger.info("data files wiped: " + helper.getHome());
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private void closeNodes(Helper helper) throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : helper.clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : helper.nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private static void deleteFiles(Path directory) throws IOException {
|
||||
if (Files.exists(directory)) {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Helper create() {
|
||||
Helper helper = new Helper();
|
||||
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
|
||||
helper.setClusterName("test-cluster-" + helper.randomString(8));
|
||||
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
|
||||
return helper;
|
||||
}
|
||||
|
||||
class Helper {
|
||||
|
||||
String home;
|
||||
|
||||
String cluster;
|
||||
|
||||
String httpHost;
|
||||
|
||||
int httpPort;
|
||||
|
||||
Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
void setHome(String home) {
|
||||
this.home = home;
|
||||
}
|
||||
|
||||
String getHome() {
|
||||
return home;
|
||||
}
|
||||
|
||||
void setClusterName(String cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", getClusterName())
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
Settings getHttpSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", getClusterName())
|
||||
.put("path.home", getHome())
|
||||
.put("host", httpHost)
|
||||
.put("port", httpPort)
|
||||
.build();
|
||||
}
|
||||
|
||||
void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
ElasticsearchClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
return node;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
package org.xbib.elx.node.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexingStats;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.node.ExtendedNodeClient;
|
||||
import org.xbib.elx.node.ExtendedNodeClientProvider;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
@Ignore
|
||||
public class ReplicaTest extends TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getName());
|
||||
|
||||
@Test
|
||||
public void testReplicaLevel() throws Exception {
|
||||
|
||||
// we need nodes for replica levels
|
||||
startNode("2");
|
||||
startNode("3");
|
||||
startNode("4");
|
||||
|
||||
Settings settingsTest1 = Settings.builder()
|
||||
.put("index.number_of_shards", 2)
|
||||
.put("index.number_of_replicas", 3)
|
||||
.build();
|
||||
|
||||
Settings settingsTest2 = Settings.builder()
|
||||
.put("index.number_of_shards", 2)
|
||||
.put("index.number_of_replicas", 1)
|
||||
.build();
|
||||
|
||||
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
|
||||
.provider(ExtendedNodeClientProvider.class)
|
||||
.build();
|
||||
|
||||
try {
|
||||
client.newIndex("test1", settingsTest1, new HashMap<>())
|
||||
.newIndex("test2", settingsTest2, new HashMap<>());
|
||||
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
|
||||
for (int i = 0; i < 1234; i++) {
|
||||
client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
for (int i = 0; i < 1234; i++) {
|
||||
client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
logger.info("refreshing");
|
||||
client.refreshIndex("test1");
|
||||
client.refreshIndex("test2");
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
|
||||
.setIndices("test1", "test2")
|
||||
.setQuery(matchAllQuery());
|
||||
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
|
||||
logger.info("query total hits={}", hits);
|
||||
assertEquals(2468, hits);
|
||||
IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.getClient(), IndicesStatsAction.INSTANCE)
|
||||
.all();
|
||||
IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet();
|
||||
for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) {
|
||||
IndexStats indexStats = m.getValue();
|
||||
CommonStats commonStats = indexStats.getTotal();
|
||||
IndexingStats indexingStats = commonStats.getIndexing();
|
||||
IndexingStats.Stats stats = indexingStats.getTotal();
|
||||
logger.info("index {}: count = {}", m.getKey(), stats.getIndexCount());
|
||||
for (Map.Entry<Integer, IndexShardStats> me : indexStats.getIndexShards().entrySet()) {
|
||||
IndexShardStats indexShardStats = me.getValue();
|
||||
CommonStats commonShardStats = indexShardStats.getTotal();
|
||||
logger.info("shard {} count = {}", me.getKey(),
|
||||
commonShardStats.getIndexing().getTotal().getIndexCount());
|
||||
}
|
||||
}
|
||||
try {
|
||||
client.deleteIndex("test1")
|
||||
.deleteIndex("test2");
|
||||
} catch (Exception e) {
|
||||
logger.error("delete index failed, ignored. Reason:", e);
|
||||
}
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateReplicaLevel() throws Exception {
|
||||
|
||||
long numberOfShards = 2;
|
||||
int replicaLevel = 3;
|
||||
|
||||
// we need 3 nodes for replica level 3
|
||||
startNode("2");
|
||||
startNode("3");
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", numberOfShards)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
|
||||
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
|
||||
.provider(ExtendedNodeClientProvider.class)
|
||||
.build();
|
||||
|
||||
try {
|
||||
client.newIndex("replicatest", settings, new HashMap<>());
|
||||
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
|
||||
for (int i = 0; i < 12345; i++) {
|
||||
client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
client.updateReplicaLevel("replicatest", replicaLevel, 30L, TimeUnit.SECONDS);
|
||||
assertEquals(replicaLevel, client.getReplicaLevel("replicatest"));
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,204 +0,0 @@
|
||||
package org.xbib.elx.node.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
public class TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
private Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
private String cluster;
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
@Before
|
||||
public void startNodes() {
|
||||
try {
|
||||
logger.info("starting");
|
||||
this.cluster = "test-cluster-" + System.getProperty("user.name");
|
||||
startNode("1");
|
||||
findNodeAddress();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||
clusterStateRequest.all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
} catch (Throwable t) {
|
||||
logger.error(t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopNodes() {
|
||||
try {
|
||||
closeNodes();
|
||||
} catch (Exception e) {
|
||||
logger.error("can not close nodes", e);
|
||||
} finally {
|
||||
try {
|
||||
deleteFiles();
|
||||
logger.info("data files wiped");
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Settings getTransportSettings() {
|
||||
return Settings.builder()
|
||||
.put("host", host)
|
||||
.put("port", port)
|
||||
.put("cluster.name", cluster)
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("discovery.zen.minimum_master_nodes", "1")
|
||||
.put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME)
|
||||
.put("node.max_local_storage_nodes", 10) // allow many nodes to initialize here
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected static String getHome() {
|
||||
return System.getProperty("path.home", System.getProperty("user.dir"));
|
||||
}
|
||||
|
||||
protected void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
protected AbstractClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
protected String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
protected String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private void closeNodes() throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
clients.clear();
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
nodes.clear();
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private void findNodeAddress() {
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
|
||||
TransportAddress address= response.getNodes().iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
return node;
|
||||
}
|
||||
|
||||
private static void deleteFiles() throws IOException {
|
||||
Path directory = Paths.get(getHome() + "/data");
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,236 @@
|
||||
package org.xbib.elx.node.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.jupiter.api.extension.ParameterContext;
|
||||
import org.junit.jupiter.api.extension.ParameterResolutionException;
|
||||
import org.junit.jupiter.api.extension.ParameterResolver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Junit 5 extension for testing Elasticsearch.
|
||||
* The extension will be instantiated as a singleton.
|
||||
* For parallel test method executions, for example in gradle, it requires a helper class
|
||||
* to ensure different ES homes/clusters for each run.
|
||||
*/
|
||||
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final String key = "es-instance-";
|
||||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
private static final ExtensionContext.Namespace ns =
|
||||
ExtensionContext.Namespace.create(TestExtension.class);
|
||||
|
||||
@Override
|
||||
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
return parameterContext.getParameter().getType().equals(Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
// initialize new helper here, increase counter
|
||||
return extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
||||
helper.startNode("1");
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
TransportAddress address = response.getNodes().get(0).getTransport().getAddress().publishAddress();
|
||||
helper.host = address.address().getHostName();
|
||||
helper.port = address.address().getPort();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
closeNodes(helper);
|
||||
deleteFiles(Paths.get(helper.getHome()));
|
||||
logger.info("data files wiped: " + helper.getHome());
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private void closeNodes(Helper helper) throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : helper.clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : helper.nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private static void deleteFiles(Path directory) throws IOException {
|
||||
if (Files.exists(directory)) {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Helper create() {
|
||||
Helper helper = new Helper();
|
||||
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
|
||||
helper.setClusterName("test-cluster-" + helper.randomString(8));
|
||||
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
|
||||
return helper;
|
||||
}
|
||||
|
||||
class Helper {
|
||||
|
||||
String home;
|
||||
|
||||
String cluster;
|
||||
|
||||
String host;
|
||||
|
||||
int port;
|
||||
|
||||
Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
void setHome(String home) {
|
||||
this.home = home;
|
||||
}
|
||||
|
||||
String getHome() {
|
||||
return home;
|
||||
}
|
||||
|
||||
void setClusterName(String cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", getClusterName())
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
Settings getTransportSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("path.home", getHome())
|
||||
.put("host", host)
|
||||
.put("port", port)
|
||||
.build();
|
||||
}
|
||||
|
||||
void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
ElasticsearchClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
return node;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,161 +0,0 @@
|
||||
package org.xbib.elx.transport.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.IndexingStats;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.transport.ExtendedTransportClient;
|
||||
import org.xbib.elx.transport.ExtendedTransportClientProvider;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
@Ignore
|
||||
public class ReplicaTest extends TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getName());
|
||||
|
||||
@Test
|
||||
public void testReplicaLevel() throws Exception {
|
||||
|
||||
// we need nodes for replica levels
|
||||
startNode("2");
|
||||
startNode("3");
|
||||
startNode("4");
|
||||
|
||||
Settings settingsTest1 = Settings.builder()
|
||||
.put("index.number_of_shards", 2)
|
||||
.put("index.number_of_replicas", 3)
|
||||
.build();
|
||||
|
||||
Settings settingsTest2 = Settings.builder()
|
||||
.put("index.number_of_shards", 2)
|
||||
.put("index.number_of_replicas", 1)
|
||||
.build();
|
||||
|
||||
final ExtendedTransportClient client = ClientBuilder.builder()
|
||||
.provider(ExtendedTransportClientProvider.class)
|
||||
.put(getTransportSettings())
|
||||
.build();
|
||||
|
||||
try {
|
||||
client.newIndex("test1", settingsTest1, new HashMap<>())
|
||||
.newIndex("test2", settingsTest2, new HashMap<>());
|
||||
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
|
||||
for (int i = 0; i < 1234; i++) {
|
||||
client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
for (int i = 0; i < 1234; i++) {
|
||||
client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
logger.info("refreshing");
|
||||
client.refreshIndex("test1");
|
||||
client.refreshIndex("test2");
|
||||
SearchSourceBuilder builder = new SearchSourceBuilder();
|
||||
builder.query(QueryBuilders.matchAllQuery());
|
||||
builder.size(0);
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices("test1", "test2");
|
||||
searchRequest.source(builder);
|
||||
SearchResponse searchResponse =
|
||||
client.getClient().execute(SearchAction.INSTANCE, searchRequest).actionGet();
|
||||
long hits = searchResponse.getHits().getTotalHits();
|
||||
logger.info("query total hits={}", hits);
|
||||
assertEquals(2468, hits);
|
||||
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
|
||||
indicesStatsRequest.all();
|
||||
IndicesStatsResponse response =
|
||||
client.getClient().execute(IndicesStatsAction.INSTANCE, indicesStatsRequest).actionGet();
|
||||
for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) {
|
||||
IndexStats indexStats = m.getValue();
|
||||
CommonStats commonStats = indexStats.getTotal();
|
||||
IndexingStats indexingStats = commonStats.getIndexing();
|
||||
IndexingStats.Stats stats = indexingStats.getTotal();
|
||||
logger.info("index {}: count = {}", m.getKey(), stats.getIndexCount());
|
||||
for (Map.Entry<Integer, IndexShardStats> me : indexStats.getIndexShards().entrySet()) {
|
||||
IndexShardStats indexShardStats = me.getValue();
|
||||
CommonStats commonShardStats = indexShardStats.getTotal();
|
||||
logger.info("shard {} count = {}", me.getKey(),
|
||||
commonShardStats.getIndexing().getTotal().getIndexCount());
|
||||
}
|
||||
}
|
||||
try {
|
||||
client.deleteIndex("test1")
|
||||
.deleteIndex("test2");
|
||||
} catch (Exception e) {
|
||||
logger.error("delete index failed, ignored. Reason:", e);
|
||||
}
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateReplicaLevel() throws Exception {
|
||||
|
||||
long numberOfShards = 2;
|
||||
int replicaLevel = 3;
|
||||
|
||||
// we need 3 nodes for replica level 3
|
||||
startNode("2");
|
||||
startNode("3");
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", numberOfShards)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
|
||||
final ExtendedTransportClient client = ClientBuilder.builder()
|
||||
.provider(ExtendedTransportClientProvider.class)
|
||||
.put(getTransportSettings())
|
||||
.build();
|
||||
|
||||
try {
|
||||
client.newIndex("replicatest", settings, new HashMap<>());
|
||||
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
|
||||
for (int i = 0; i < 12345; i++) {
|
||||
client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
client.updateReplicaLevel("replicatest", replicaLevel, 30L, TimeUnit.SECONDS);
|
||||
assertEquals(replicaLevel, client.getReplicaLevel("replicatest"));
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,204 +0,0 @@
|
||||
package org.xbib.elx.transport.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
public class TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
private Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
private String cluster;
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
@Before
|
||||
public void startNodes() {
|
||||
try {
|
||||
logger.info("starting");
|
||||
this.cluster = "test-cluster-" + System.getProperty("user.name");
|
||||
startNode("1");
|
||||
findNodeAddress();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||
clusterStateRequest.all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
} catch (Throwable t) {
|
||||
logger.error(t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopNodes() {
|
||||
try {
|
||||
closeNodes();
|
||||
} catch (Exception e) {
|
||||
logger.error("can not close nodes", e);
|
||||
} finally {
|
||||
try {
|
||||
deleteFiles();
|
||||
logger.info("data files wiped");
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Settings getTransportSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("path.home", getHome())
|
||||
.put("host", host)
|
||||
.put("port", port)
|
||||
.build();
|
||||
}
|
||||
|
||||
protected Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("discovery.zen.minimum_master_nodes", "1")
|
||||
.put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME)
|
||||
.put("node.max_local_storage_nodes", 10) // allow many nodes to initialize here
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected static String getHome() {
|
||||
return System.getProperty("path.home", System.getProperty("user.dir"));
|
||||
}
|
||||
|
||||
protected void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
protected AbstractClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
protected String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
protected String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private void closeNodes() throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
clients.clear();
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
nodes.clear();
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private void findNodeAddress() {
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
|
||||
TransportAddress address= response.getNodes().iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
return node;
|
||||
}
|
||||
|
||||
private static void deleteFiles() throws IOException {
|
||||
Path directory = Paths.get(getHome() + "/data");
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,236 @@
|
||||
package org.xbib.elx.transport.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeValidationException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.jupiter.api.extension.ParameterContext;
|
||||
import org.junit.jupiter.api.extension.ParameterResolutionException;
|
||||
import org.junit.jupiter.api.extension.ParameterResolver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Junit 5 extension for testing Elasticsearch.
|
||||
* The extension will be instantiated as a singleton.
|
||||
* For parallel test method executions, for example in gradle, it requires a helper class
|
||||
* to ensure different ES homes/clusters for each run.
|
||||
*/
|
||||
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final String key = "es-instance-";
|
||||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
private static final ExtensionContext.Namespace ns =
|
||||
ExtensionContext.Namespace.create(TestExtension.class);
|
||||
|
||||
@Override
|
||||
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
return parameterContext.getParameter().getType().equals(Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
// initialize new helper here, increase counter
|
||||
return extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
||||
helper.startNode("1");
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
TransportAddress address = response.getNodes().get(0).getTransport().getAddress().publishAddress();
|
||||
helper.host = address.address().getHostName();
|
||||
helper.port = address.address().getPort();
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
closeNodes(helper);
|
||||
deleteFiles(Paths.get(helper.getHome()));
|
||||
logger.info("data files wiped: " + helper.getHome());
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private void closeNodes(Helper helper) throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : helper.clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : helper.nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
logger.info("all nodes closed");
|
||||
}
|
||||
|
||||
private static void deleteFiles(Path directory) throws IOException {
|
||||
if (Files.exists(directory)) {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
||||
Files.delete(dir);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Helper create() {
|
||||
Helper helper = new Helper();
|
||||
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
|
||||
helper.setClusterName("test-cluster-" + helper.randomString(8));
|
||||
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
|
||||
return helper;
|
||||
}
|
||||
|
||||
class Helper {
|
||||
|
||||
String home;
|
||||
|
||||
String cluster;
|
||||
|
||||
String host;
|
||||
|
||||
int port;
|
||||
|
||||
Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
||||
void setHome(String home) {
|
||||
this.home = home;
|
||||
}
|
||||
|
||||
String getHome() {
|
||||
return home;
|
||||
}
|
||||
|
||||
void setClusterName(String cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
String getClusterName() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", getClusterName())
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
Settings getTransportSettings() {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", cluster)
|
||||
.put("path.home", getHome())
|
||||
.put("host", host)
|
||||
.put("port", port)
|
||||
.build();
|
||||
}
|
||||
|
||||
void startNode(String id) throws NodeValidationException {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
ElasticsearchClient client(String id) {
|
||||
return clients.get(id);
|
||||
}
|
||||
|
||||
String randomString(int len) {
|
||||
final char[] buf = new char[len];
|
||||
final int n = numbersAndLetters.length - 1;
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
buf[i] = numbersAndLetters[random.nextInt(n)];
|
||||
}
|
||||
return new String(buf);
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
|
||||
Node node = new MockNode(nodeSettings, plugins);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
return node;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,17 +1,17 @@
|
||||
group = org.xbib
|
||||
name = elx
|
||||
version = 6.3.2.1
|
||||
version = 6.3.2.2
|
||||
profile = default
|
||||
release = 0
|
||||
|
||||
elasticsearch-server.version = 6.3.2.2
|
||||
elasticsearch-server.version = 6.3.2.3
|
||||
log4j.version = 2.11.1
|
||||
xbib-metrics.version = 1.2.0
|
||||
xbib-netty-http.version = 4.1.33.0
|
||||
xbib-netty-http.version = 4.1.35.0
|
||||
|
||||
# test
|
||||
junit.version = 4.12
|
||||
wagon.version = 3.0.0
|
||||
junit.version = 5.4.2
|
||||
wagon.version = 3.3.2
|
||||
asciidoclet.version = 1.6.0.0
|
||||
|
||||
org.gradle.warning.mode = all
|
||||
|
Binary file not shown.
Loading…
Reference in New Issue