add bulk tuning for translog (async mode) and fast replica change
This commit is contained in:
parent
a691ace2d5
commit
97f460ae98
4 changed files with 29 additions and 5 deletions
|
@ -120,9 +120,23 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount());
|
logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount());
|
||||||
|
int currentReplicaLevel = getReplicaLevel(indexDefinition);
|
||||||
|
logger.info("current replica level for " + indexDefinition + " is " + currentReplicaLevel);
|
||||||
|
if (currentReplicaLevel < indexDefinition.getReplicaCount()) {
|
||||||
|
putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "5", 30L, TimeUnit.SECONDS);
|
||||||
|
putClusterSetting("indices.recovery.max_bytes_per_sec", "2gb", 30L, TimeUnit.SECONDS);
|
||||||
|
logger.info("recovery boost activated");
|
||||||
|
}
|
||||||
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas",
|
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas",
|
||||||
indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS);
|
indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS);
|
||||||
waitForHealthyCluster();
|
waitForHealthyCluster();
|
||||||
|
if (currentReplicaLevel < indexDefinition.getReplicaCount()) {
|
||||||
|
// 2 = default setting
|
||||||
|
putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "2", 30L, TimeUnit.SECONDS);
|
||||||
|
// 20m = default value
|
||||||
|
putClusterSetting("indices.recovery.max_bytes_per_sec", "40mb", 30L, TimeUnit.SECONDS);
|
||||||
|
logger.info("recovery boost deactivated");
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
|
@ -118,7 +119,11 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
updateSettingsBuilder.put(key, value.toString());
|
updateSettingsBuilder.put(key, value.toString());
|
||||||
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
|
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
|
||||||
updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
|
updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
|
||||||
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
ClusterUpdateSettingsResponse clusterUpdateSettingsResponse =
|
||||||
|
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
||||||
|
if (clusterUpdateSettingsResponse.isAcknowledged()) {
|
||||||
|
logger.info("cluster update of " + key + " to " + value + " acknowledged");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -139,7 +144,8 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
.waitForStatus(status);
|
.waitForStatus(status);
|
||||||
ClusterHealthResponse healthResponse =
|
ClusterHealthResponse healthResponse =
|
||||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
logger.info("got cluster status " + healthResponse.getStatus().name());
|
||||||
|
if (healthResponse.isTimedOut()) {
|
||||||
String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name();
|
String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name();
|
||||||
logger.error(message);
|
logger.error(message);
|
||||||
throw new IllegalStateException(message);
|
throw new IllegalStateException(message);
|
||||||
|
|
|
@ -141,8 +141,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
int interval = indexDefinition.getStartBulkRefreshSeconds();
|
int interval = indexDefinition.getStartBulkRefreshSeconds();
|
||||||
if (interval != 0) {
|
if (interval != 0) {
|
||||||
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
|
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
|
||||||
updateIndexSetting(indexName, "refresh_interval",
|
updateIndexSetting(indexName,
|
||||||
interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
"refresh_interval", interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
||||||
|
updateIndexSetting(indexName,
|
||||||
|
"index.translog.durability", "async", 30L, TimeUnit.SECONDS);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
|
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
|
||||||
}
|
}
|
||||||
|
@ -168,6 +170,8 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
|
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
|
||||||
updateIndexSetting(indexName, "refresh_interval",
|
updateIndexSetting(indexName, "refresh_interval",
|
||||||
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
||||||
|
updateIndexSetting(indexName,
|
||||||
|
"index.translog.durability", "request", 30L, TimeUnit.SECONDS);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
|
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
group = org.xbib
|
group = org.xbib
|
||||||
name = elx
|
name = elx
|
||||||
version = 7.10.2.6
|
version = 7.10.2.7
|
||||||
|
|
||||||
gradle.wrapper.version = 6.6.1
|
gradle.wrapper.version = 6.6.1
|
||||||
xbib-metrics.version = 2.1.0
|
xbib-metrics.version = 2.1.0
|
||||||
|
|
Loading…
Reference in a new issue