wait 5 minutes for cluster health after index replica and force merge to not confuse subsequent bulk indexing with yelloe health
This commit is contained in:
parent
870cc09767
commit
2ca3b3e9ee
7 changed files with 33 additions and 37 deletions
|
@ -47,7 +47,7 @@ public interface AdminClient extends BasicClient {
|
|||
* Resolve alias.
|
||||
*
|
||||
* @param alias the alias
|
||||
* @return the index names behind the alias or an empty list if there is no such index
|
||||
* @return the index names in ordered sequence behind the alias or an empty list if there is no such alias
|
||||
*/
|
||||
List<String> resolveAlias(String alias);
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
|
|||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
|
@ -64,9 +65,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -107,7 +106,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
ensureClientIsPresent();
|
||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
|
||||
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
|
||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -120,10 +119,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
logger.warn("invalid replica level");
|
||||
return this;
|
||||
}
|
||||
String index = indexDefinition.getFullIndexName();
|
||||
long maxWaitTime = indexDefinition.getMaxWaitTime();
|
||||
TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit();
|
||||
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
|
||||
logger.info("update replica level for " + indexDefinition + " to " + level);
|
||||
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", level,
|
||||
30L, TimeUnit.SECONDS);
|
||||
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -196,8 +195,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
if (indexMetadata == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return indexMetadata.stream().map(im -> im.getIndex().getName())
|
||||
.sorted().collect(Collectors.toList());
|
||||
return indexMetadata.stream()
|
||||
.map(im -> im.getIndex().getName())
|
||||
.sorted() // important
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -403,22 +404,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
return false;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
String index = indexDefinition.getFullIndexName();
|
||||
logger.info("force merge of " + indexDefinition);
|
||||
ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
|
||||
forceMergeRequest.indices(index);
|
||||
try {
|
||||
client.execute(ForceMergeAction.INSTANCE, forceMergeRequest)
|
||||
.get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
|
||||
return true;
|
||||
} catch (TimeoutException e) {
|
||||
logger.error("timeout");
|
||||
} catch (ExecutionException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error(e.getMessage(), e);
|
||||
forceMergeRequest.indices(indexDefinition.getFullIndexName());
|
||||
ForceMergeResponse forceMergeResponse = client.execute(ForceMergeAction.INSTANCE, forceMergeRequest)
|
||||
.actionGet();
|
||||
if (forceMergeResponse.getFailedShards() > 0) {
|
||||
throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards());
|
||||
}
|
||||
return false;
|
||||
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -7,10 +7,6 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
|
@ -30,8 +26,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPoolInfo;
|
||||
import org.xbib.elx.api.BasicClient;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import java.io.IOException;
|
||||
|
@ -123,6 +117,7 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
@Override
|
||||
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
|
||||
ensureClientIsPresent();
|
||||
logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit);
|
||||
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
|
|
|
@ -118,8 +118,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
return;
|
||||
}
|
||||
// we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly.
|
||||
logger.info("waiting for GREEN after index {} was created", index);
|
||||
waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
|
||||
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -102,7 +102,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
int interval = indexDefinition.getStartBulkRefreshSeconds();
|
||||
if (interval != 0) {
|
||||
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval",
|
||||
interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
||||
} else {
|
||||
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
|
||||
}
|
||||
|
@ -116,7 +117,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
|
||||
if (interval != 0) {
|
||||
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval",
|
||||
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
||||
} else {
|
||||
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ class AliasTest {
|
|||
long t1 = (System.nanoTime() - t0) / 1000000;
|
||||
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
|
||||
assertTrue(t1 >= 0);
|
||||
client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest());
|
||||
client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForGreenStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -76,7 +76,11 @@ class AliasTest {
|
|||
indexRequest = new CreateIndexRequest("test20160103");
|
||||
client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet();
|
||||
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
||||
String[] indices = new String[] { "test20160101", "test20160102", "test20160103" };
|
||||
String[] indices = {
|
||||
"test20160101",
|
||||
"test20160102",
|
||||
"test20160103"
|
||||
};
|
||||
String[] aliases = new String[] { alias };
|
||||
IndicesAliasesRequest.AliasActions aliasAction =
|
||||
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
|
||||
|
@ -89,6 +93,7 @@ class AliasTest {
|
|||
GetAliasesResponse getAliasesResponse =
|
||||
client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
|
||||
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
|
||||
// reverse order
|
||||
Set<String> result = new TreeSet<>(Collections.reverseOrder());
|
||||
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
|
||||
Matcher m = pattern.matcher(indexName.value);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
group = org.xbib
|
||||
name = elx
|
||||
version = 7.10.2.1
|
||||
version = 7.10.2.2
|
||||
|
||||
gradle.wrapper.version = 6.6.1
|
||||
xbib-metrics.version = 2.1.0
|
||||
|
|
Loading…
Reference in a new issue