diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index fe0ead6..9999eea 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -3,13 +3,18 @@ package org.xbib.elx.api; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import java.io.Closeable; -import java.io.IOException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public interface BasicClient extends Closeable { - void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; + /** + * Initiative the client + * @param settings settings + */ + void init(Settings settings); + + void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit); /** * Set an Elasticsearch client to extend from it. May be null for TransportClient. @@ -24,14 +29,6 @@ public interface BasicClient extends Closeable { */ ElasticsearchClient getClient(); - /** - * Initiative the extended client, the bulk metric and bulk controller, - * creates instances and connect to cluster, if required. - * - * @param settings settings - * @throws IOException if init fails - */ - void init(Settings settings) throws IOException; /** * Get cluster name. @@ -50,14 +47,8 @@ public interface BasicClient extends Closeable { /** * Wait for cluster being healthy. - * - * @param healthColor cluster health color to wait for - * @param maxWaitTime time value - * @param timeUnit time unit */ - void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); - - void waitForShards(long maxWaitTime, TimeUnit timeUnit); + void waitForHealthyCluster(); long getSearchableDocs(IndexDefinition indexDefinition); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 8ad0db5..4303132 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -112,7 +112,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); return this; } @@ -129,6 +129,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements long maxWaitTime = indexDefinition.getMaxWaitTime(); TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit(); updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); + waitForHealthyCluster(); return this; } @@ -409,7 +410,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (forceMergeResponse.getFailedShards() > 0) { throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); } - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); return true; } @@ -421,7 +422,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) .settings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index b1429f5..0fb30dd 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -76,10 +76,10 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { + this.settings = settings; if (closed.compareAndSet(false, true)) { logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); - this.settings = settings; setClient(createClient(settings)); } } @@ -105,13 +105,13 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { + public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) { ensureClientIsPresent(); if (key == null) { - throw new IOException("no key given"); + throw new IllegalArgumentException("no key given"); } if (value == null) { - throw new IOException("no value given"); + throw new IllegalArgumentException("no value given"); } Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); @@ -120,26 +120,16 @@ public abstract class AbstractBasicClient implements BasicClient { client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); } - protected Long getThreadPoolQueueSize(String name) { - ensureClientIsPresent(); - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.threadPool(true); - NodesInfoResponse nodesInfoResponse = - client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); - for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { - ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool(); - for (ThreadPool.Info info : threadPoolInfo) { - if (info.getName().equals(name)) { - return info.getQueueSize().getSingles(); - } - } - } - return null; - } - @Override - public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { + public void waitForHealthyCluster() { ensureClientIsPresent(); + String statusString = settings.get(Parameters.CLUSTER_TARGET_HEALTH.getName(), + Parameters.CLUSTER_TARGET_HEALTH.getString()); + String waitTimeStr = settings.get(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), + Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getString()); + TimeValue timeValue = TimeValue.parseTimeValue(waitTimeStr, TimeValue.timeValueMinutes(30L), ""); + long maxWaitTime = timeValue.minutes(); + TimeUnit timeUnit = TimeUnit.MINUTES; logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); @@ -155,23 +145,6 @@ public abstract class AbstractBasicClient implements BasicClient { } } - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - ensureClientIsPresent(); - logger.info("waiting for cluster shard settling"); - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() - .waitForRelocatingShards(0) - .timeout(timeout); - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse.isTimedOut()) { - String message = "timeout waiting for cluster shards: " + timeout; - logger.error(message); - throw new IllegalStateException(message); - } - } - @Override public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); @@ -228,7 +201,7 @@ public abstract class AbstractBasicClient implements BasicClient { } } - protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; + protected abstract ElasticsearchClient createClient(Settings settings); protected abstract void closeClient(Settings settings) throws IOException; @@ -284,4 +257,21 @@ public abstract class AbstractBasicClient implements BasicClient { throw new IllegalArgumentException("unknown time unit: " + timeUnit); } } + + protected Long getThreadPoolQueueSize(String name) { + ensureClientIsPresent(); + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.threadPool(true); + NodesInfoResponse nodesInfoResponse = + client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { + ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool(); + for (ThreadPool.Info info : threadPoolInfo) { + if (info.getName().equals(name)) { + return info.getQueueSize().getSingles(); + } + } + } + return null; + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 14c6ffc..f52f573 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -40,7 +40,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { if (closed.compareAndSet(true, false)) { super.init(settings); bulkProcessor = new DefaultBulkProcessor(this, settings); @@ -114,7 +114,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements logger.warn("index creation of {} not acknowledged", index); return; } - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 86ede0f..8a1a8d1 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -49,7 +49,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { if (closed.compareAndSet(true, false)) { super.init(settings); this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index 6e25fef..66619fd 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -3,8 +3,6 @@ package org.xbib.elx.common; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import java.util.concurrent.TimeUnit; - /** * A mocked client, it does not perform any actions on a cluster. Useful for testing. */ @@ -28,14 +26,6 @@ public class MockAdminClient extends AbstractAdminClient { protected void closeClient(Settings settings) { } - @Override - public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) { - } - - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - } - @Override public void close() { // nothing to do diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index e0e861b..0856fe5 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -26,10 +26,6 @@ public class MockBulkClient extends AbstractBulkClient { return null; } - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - } - @Override protected ElasticsearchClient createClient(Settings settings) { return null; diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java index 9d182dc..b4851ad 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java @@ -24,10 +24,6 @@ public class MockSearchClient extends AbstractSearchClient { return null; } - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - } - @Override protected ElasticsearchClient createClient(Settings settings) { return null; diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 1dd55bc..30a3d9e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,6 +2,10 @@ package org.xbib.elx.common; public enum Parameters { + CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"), + + CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"), + DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java index d567d6d..c595a31 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java @@ -2,21 +2,12 @@ package org.xbib.elx.common.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -68,23 +59,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft public void beforeEach(ExtensionContext extensionContext) throws Exception { Helper helper = extensionContext.getParent().get().getStore(ns) .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); - logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode(); - try { - ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) - .timeout(TimeValue.timeValueSeconds(30))).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + ", from here on, everything will fail!"); - } - } catch (ElasticsearchTimeoutException e) { - throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); - } - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); - ClusterStateResponse clusterStateResponse = - helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); - logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); } @Override @@ -197,7 +172,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .put(getNodeSettings()) .put("node.name", "1") .build(); - return new MockNode(nodeSettings); + this.node = new MockNode(nodeSettings); + return node; } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 6de3be9..16b3893 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -35,7 +35,7 @@ class BulkClientTest { void testNewIndex() throws Exception { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -46,7 +46,7 @@ class BulkClientTest { void testSingleDoc() throws Exception { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -65,7 +65,7 @@ class BulkClientTest { long numactions = ACTIONS; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -93,7 +93,7 @@ class BulkClientTest { long timeout = 120L; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index b9f3e02..d5241aa 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -37,7 +37,7 @@ class DuplicateIDTest { long numactions = ACTIONS; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 5560139..bae31c9 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -40,11 +40,11 @@ class IndexPruneTest { void testPrune() throws IOException { try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build(); NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test_prune"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 49545df..5538f2e 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -38,11 +38,11 @@ class IndexShiftTest { void testIndexShift() throws Exception { try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build(); NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 6fa4b2b..0e2abf5 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -40,7 +40,7 @@ class SearchTest { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -60,7 +60,7 @@ class SearchTest { } try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client()) .setSearchClientProvider(NodeSearchClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { // test stream count Stream stream = searchClient.search(qb -> qb diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 1a40185..16890d1 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -36,11 +36,11 @@ class SmokeTest { void smokeTest() throws Exception { try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build(); NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { assertEquals(helper.getClusterName(), adminClient.getClusterName()); IndexDefinition indexDefinition = @@ -51,22 +51,20 @@ class SmokeTest { indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.checkMapping(indexDefinition); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.delete(indexDefinition, "1"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.deleteIndex(indexDefinition); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.updateReplicaLevel(indexDefinition, 2); - int replica = adminClient.getReplicaLevel(indexDefinition); - assertEquals(2, replica); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); + adminClient.updateReplicaLevel(indexDefinition, 1); + assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().getLastBulkError() != null) { diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index 16fa2e0..ab88efa 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -102,7 +102,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private Helper create() { Helper helper = new Helper(); - String home = System.getProperty("path.home", "build/elxnode"); + String home = System.getProperty("path.home", "build/elxnode/"); helper.setHome(home + "/" + helper.randomString(8)); helper.setClusterName("test-cluster-" + helper.randomString(8)); logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); @@ -137,11 +137,17 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return cluster; } - Settings getNodeSettings() { + Settings getClientSettings() { return Settings.builder() - .put("name", "elx-client") // for threadpool name .put("cluster.name", getClusterName()) .put("path.home", getHome()) + .put("name", getClusterName() + "-name-client") // for threadpool setting + .put("node.name", getClusterName() + "-client") + .put("node.master", "false") + .put("node.data", "false") + .put("node.client", "true") + .put("cluster.target_health", "YELLOW") + .put("cluster.target_health_timeout", "1m") .build(); } @@ -168,10 +174,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } Node buildNode() { - String id = "1"; Settings nodeSettings = Settings.builder() - .put(getNodeSettings()) - .put("node.name", id) + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .put("name", getClusterName() + "-name-server") // for threadpool setting + .put("node.name", getClusterName() + "-server") + .put("node.master", "true") + .put("node.data", "true") + .put("node.client", "false") .build(); this.node = new MockNode(nodeSettings); return node; diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index 08b3cbc..ff5a88b 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -5,8 +5,6 @@ import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractAdminClient; -import java.io.IOException; - /** * Transport admin client. */ @@ -20,12 +18,12 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings); } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { super.init(settings); helper.init((TransportClient) getClient(), settings); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index b591fc2..f7f9166 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -4,7 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractBulkClient; -import java.io.IOException; /** * Transport search client with additional methods. @@ -19,12 +18,12 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings); } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { super.init(settings); helper.init((TransportClient) getClient(), settings); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 08ee421..9ce152d 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -46,7 +46,7 @@ public class TransportClientHelper { } } - public void init(TransportClient transportClient, Settings settings) throws IOException { + public void init(TransportClient transportClient, Settings settings) { Collection addrs = findAddresses(settings); if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) { throw new NoNodeAvailableException("no cluster nodes available, check settings = " @@ -54,7 +54,7 @@ public class TransportClientHelper { } } - private Collection findAddresses(Settings settings) throws IOException { + private Collection findAddresses(Settings settings) { final int defaultPort = settings.getAsInt("port", 9300); Collection addresses = new ArrayList<>(); for (String hostname : settings.getAsArray("host")) { @@ -66,16 +66,20 @@ public class TransportClientHelper { int port = Integer.parseInt(splitHost[1]); TransportAddress address = new InetSocketTransportAddress(inetAddress, port); addresses.add(address); - } catch (NumberFormatException e) { + } catch (IOException e) { logger.warn(e.getMessage(), e); } } else if (splitHost.length == 1) { - String host = splitHost[0]; - InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); - TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); - addresses.add(address); + try { + String host = splitHost[0]; + InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); + TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); + addresses.add(address); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } } else { - throw new IOException("invalid hostname specification: " + hostname); + throw new IllegalArgumentException("invalid hostname specification: " + hostname); } } return addresses; diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index 53bc8a7..eb39534 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -4,7 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractSearchClient; -import java.io.IOException; /** * Transport search client with additional methods. @@ -19,12 +18,12 @@ public class TransportSearchClient extends AbstractSearchClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings); } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { super.init(settings); helper.init((TransportClient) getClient(), settings); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 76fd9a3..c122e0d 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -35,7 +35,7 @@ class BulkClientTest { void testNewIndex() throws Exception { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -46,7 +46,7 @@ class BulkClientTest { void testSingleDoc() throws Exception { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -65,7 +65,7 @@ class BulkClientTest { long numactions = ACTIONS; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -91,7 +91,7 @@ class BulkClientTest { final long timeout = 120L; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test"); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 437850c..4643a9a 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -32,7 +32,7 @@ class DuplicateIDTest { long numactions = ACTIONS; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index ea509af..1ad1c5e 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -40,11 +40,11 @@ class IndexPruneTest { void testPrune() throws IOException { try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test_prune"); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 8f1f4c1..e4b39e9 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -40,11 +40,11 @@ class IndexShiftTest { void testIndexShift() throws Exception { try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 3d254f6..ea60953 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -42,7 +42,7 @@ class SearchTest { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -62,7 +62,7 @@ class SearchTest { } try (TransportSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(TransportSearchClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { // test stream count Stream stream = searchClient.search(qb -> qb diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index bb917be..532a22d 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -36,11 +36,11 @@ class SmokeTest { void smokeTest() throws Exception { try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); @@ -50,21 +50,20 @@ class SmokeTest { assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.checkMapping(indexDefinition); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.deleteIndex(indexDefinition); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.updateReplicaLevel(indexDefinition, 2); - int replica = adminClient.getReplicaLevel(indexDefinition); - assertEquals(2, replica); + assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); + adminClient.updateReplicaLevel(indexDefinition, 1); + assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().getLastBulkError() != null) { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index 2bda899..5791ffd 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -2,21 +2,11 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.support.AbstractClient; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -66,10 +56,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Helper helper = extensionContext.getParent().isPresent() ? extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null; Objects.requireNonNull(helper); - logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode(); - helper.greenHealth(); - logger.info("cluster name = {}", helper.clusterName()); } @Override @@ -122,8 +109,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Node node; - AbstractClient client; - void setHome(String home) { this.home = home; } @@ -140,26 +125,21 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return cluster; } - Settings getNodeSettings() { - return Settings.builder() - .put("cluster.name", getClusterName()) - .put("path.home", getHome()) - .build(); - } - - Settings getTransportSettings() { + Settings getClientSettings() { return Settings.builder() .put("cluster.name", cluster) .put("path.home", getHome()) .put("host", host) .put("port", port) + .put("cluster.target_health", "YELLOW") + .put("cluster.target_health_timeout", "1m") .build(); } void startNode() { buildNode().start(); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + NodesInfoResponse response = node.client().execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); Object obj = response.iterator().next().getTransport().getAddress() .publishAddress(); if (obj instanceof InetSocketTransportAddress) { @@ -180,48 +160,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } Node buildNode() { - String id = "1"; Settings nodeSettings = Settings.builder() - .put(getNodeSettings()) - .put("node.name", id) + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .put("name", getClusterName() + "-name-server") // for threadpool setting + .put("node.name", getClusterName() + "-server") + .put("node.master", "true") + .put("node.data", "true") + .put("node.client", "false") .build(); - node = new MockNode(nodeSettings); - client = (AbstractClient) node.client(); + this.node = new MockNode(nodeSettings); return node; } void closeNodes() { - if (client != null) { - logger.info("closing client"); - client.close(); - } if (node != null) { logger.info("closing node"); node.close(); } } - void greenHealth() throws IOException { - try { - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) - .timeout(TimeValue.timeValueSeconds(30))).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + ", from here on, everything will fail!"); - } - } catch (ElasticsearchTimeoutException e) { - throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); - } - } - - String clusterName() { - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); - ClusterStateResponse clusterStateResponse = - client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); - return clusterStateResponse.getClusterName().value(); - } - private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); private static final Random random = new SecureRandom(); diff --git a/gradle.properties b/gradle.properties index 78d430f..2dd2a34 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.44 +version = 2.2.1.45 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0