From 97f460ae98126151260e0bd7cf8019d67d6808bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 4 May 2021 18:32:57 +0200 Subject: [PATCH] add bulk tuning for translog (async mode) and fast replica change --- .../org/xbib/elx/common/AbstractAdminClient.java | 14 ++++++++++++++ .../org/xbib/elx/common/AbstractBasicClient.java | 10 ++++++++-- .../org/xbib/elx/common/AbstractBulkClient.java | 8 ++++++-- gradle.properties | 2 +- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index d12815e..3c4cf31 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -120,9 +120,23 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return this; } logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount()); + int currentReplicaLevel = getReplicaLevel(indexDefinition); + logger.info("current replica level for " + indexDefinition + " is " + currentReplicaLevel); + if (currentReplicaLevel < indexDefinition.getReplicaCount()) { + putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "5", 30L, TimeUnit.SECONDS); + putClusterSetting("indices.recovery.max_bytes_per_sec", "2gb", 30L, TimeUnit.SECONDS); + logger.info("recovery boost activated"); + } updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS); waitForHealthyCluster(); + if (currentReplicaLevel < indexDefinition.getReplicaCount()) { + // 2 = default setting + putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "2", 30L, TimeUnit.SECONDS); + // 20m = default value + putClusterSetting("indices.recovery.max_bytes_per_sec", "40mb", 30L, TimeUnit.SECONDS); + logger.info("recovery boost deactivated"); + } return this; } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 2d6cccf..6fc3024 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -118,7 +119,11 @@ public abstract class AbstractBasicClient implements BasicClient { updateSettingsBuilder.put(key, value.toString()); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); - client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); + ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = + client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); + if (clusterUpdateSettingsResponse.isAcknowledged()) { + logger.info("cluster update of " + key + " to " + value + " acknowledged"); + } } @Override @@ -139,7 +144,8 @@ public abstract class AbstractBasicClient implements BasicClient { .waitForStatus(status); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { + logger.info("got cluster status " + healthResponse.getStatus().name()); + if (healthResponse.isTimedOut()) { String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name(); logger.error(message); throw new IllegalStateException(message); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 3e0ce7b..a31d151 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -141,8 +141,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements int interval = indexDefinition.getStartBulkRefreshSeconds(); if (interval != 0) { logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); - updateIndexSetting(indexName, "refresh_interval", - interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + updateIndexSetting(indexName, + "refresh_interval", interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + updateIndexSetting(indexName, + "index.translog.durability", "async", 30L, TimeUnit.SECONDS); } else { logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); } @@ -168,6 +170,8 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); updateIndexSetting(indexName, "refresh_interval", interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + updateIndexSetting(indexName, + "index.translog.durability", "request", 30L, TimeUnit.SECONDS); } else { logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); } diff --git a/gradle.properties b/gradle.properties index 0a90d6e..e081336 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 7.10.2.6 +version = 7.10.2.7 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0