From 2ca3b3e9ee78434e174faf72ab8faf22e3a60485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Wed, 21 Apr 2021 09:31:03 +0200 Subject: [PATCH] wait 5 minutes for cluster health after index replica and force merge to not confuse subsequent bulk indexing with yelloe health --- .../java/org/xbib/elx/api/AdminClient.java | 2 +- .../xbib/elx/common/AbstractAdminClient.java | 41 ++++++++----------- .../xbib/elx/common/AbstractBasicClient.java | 7 +--- .../xbib/elx/common/AbstractBulkClient.java | 3 +- .../xbib/elx/common/DefaultBulkProcessor.java | 6 ++- .../org/xbib/elx/common/test/AliasTest.java | 9 +++- gradle.properties | 2 +- 7 files changed, 33 insertions(+), 37 deletions(-) diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index b58bfb8..0c5e71a 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -47,7 +47,7 @@ public interface AdminClient extends BasicClient { * Resolve alias. * * @param alias the alias - * @return the index names behind the alias or an empty list if there is no such index + * @return the index names in ordered sequence behind the alias or an empty list if there is no such alias */ List resolveAlias(String alias); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 3083fdf..9e2839b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -64,9 +65,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -107,7 +106,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements ensureClientIsPresent(); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); - waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); + waitForCluster("GREEN", 300L, TimeUnit.SECONDS); return this; } @@ -120,10 +119,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements logger.warn("invalid replica level"); return this; } - String index = indexDefinition.getFullIndexName(); - long maxWaitTime = indexDefinition.getMaxWaitTime(); - TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit(); - updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); + logger.info("update replica level for " + indexDefinition + " to " + level); + updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", level, + 30L, TimeUnit.SECONDS); + waitForCluster("GREEN", 300L, TimeUnit.SECONDS); return this; } @@ -196,8 +195,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (indexMetadata == null) { return Collections.emptyList(); } - return indexMetadata.stream().map(im -> im.getIndex().getName()) - .sorted().collect(Collectors.toList()); + return indexMetadata.stream() + .map(im -> im.getIndex().getName()) + .sorted() // important + .collect(Collectors.toList()); } @Override @@ -403,22 +404,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return false; } ensureClientIsPresent(); - String index = indexDefinition.getFullIndexName(); + logger.info("force merge of " + indexDefinition); ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); - forceMergeRequest.indices(index); - try { - client.execute(ForceMergeAction.INSTANCE, forceMergeRequest) - .get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); - return true; - } catch (TimeoutException e) { - logger.error("timeout"); - } catch (ExecutionException e) { - logger.error(e.getMessage(), e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error(e.getMessage(), e); + forceMergeRequest.indices(indexDefinition.getFullIndexName()); + ForceMergeResponse forceMergeResponse = client.execute(ForceMergeAction.INSTANCE, forceMergeRequest) + .actionGet(); + if (forceMergeResponse.getFailedShards() > 0) { + throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); } - return false; + waitForCluster("GREEN", 300L, TimeUnit.SECONDS); + return true; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 8e1a373..85137c4 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -7,10 +7,6 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; @@ -30,8 +26,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPoolInfo; import org.xbib.elx.api.BasicClient; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; @@ -123,6 +117,7 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); + logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 3a5f656..ea84199 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -118,8 +118,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements return; } // we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly. - logger.info("waiting for GREEN after index {} was created", index); - waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); + waitForCluster("GREEN", 300L, TimeUnit.SECONDS); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 9c62240..aff75ac 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -102,7 +102,8 @@ public class DefaultBulkProcessor implements BulkProcessor { int interval = indexDefinition.getStartBulkRefreshSeconds(); if (interval != 0) { logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); + bulkClient.updateIndexSetting(indexName, "refresh_interval", + interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); } else { logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); } @@ -116,7 +117,8 @@ public class DefaultBulkProcessor implements BulkProcessor { if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { if (interval != 0) { logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); + bulkClient.updateIndexSetting(indexName, "refresh_interval", + interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); } else { logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); } diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java index 1f2bc65..4654236 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java @@ -62,7 +62,7 @@ class AliasTest { long t1 = (System.nanoTime() - t0) / 1000000; logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1); assertTrue(t1 >= 0); - client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest()); + client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForGreenStatus()); } @Test @@ -76,7 +76,11 @@ class AliasTest { indexRequest = new CreateIndexRequest("test20160103"); client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); - String[] indices = new String[] { "test20160101", "test20160102", "test20160103" }; + String[] indices = { + "test20160101", + "test20160102", + "test20160103" + }; String[] aliases = new String[] { alias }; IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) @@ -89,6 +93,7 @@ class AliasTest { GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); + // reverse order Set result = new TreeSet<>(Collections.reverseOrder()); for (ObjectCursor indexName : getAliasesResponse.getAliases().keys()) { Matcher m = pattern.matcher(indexName.value); diff --git a/gradle.properties b/gradle.properties index 755a55a..9fd255f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 7.10.2.1 +version = 7.10.2.2 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0