Merge branch 'es7102' of alkmene:joerg/elx into es7102
This commit is contained in:
commit
afdbe1c7d2
7 changed files with 33 additions and 37 deletions
|
@ -47,7 +47,7 @@ public interface AdminClient extends BasicClient {
|
||||||
* Resolve alias.
|
* Resolve alias.
|
||||||
*
|
*
|
||||||
* @param alias the alias
|
* @param alias the alias
|
||||||
* @return the index names behind the alias or an empty list if there is no such index
|
* @return the index names in ordered sequence behind the alias or an empty list if there is no such alias
|
||||||
*/
|
*/
|
||||||
List<String> resolveAlias(String alias);
|
List<String> resolveAlias(String alias);
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
|
||||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
|
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
|
||||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||||
|
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
||||||
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
||||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
|
||||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||||
|
@ -64,9 +65,7 @@ import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
@ -107,7 +106,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
|
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
|
||||||
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
|
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
|
||||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,10 +119,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
logger.warn("invalid replica level");
|
logger.warn("invalid replica level");
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
String index = indexDefinition.getFullIndexName();
|
logger.info("update replica level for " + indexDefinition + " to " + level);
|
||||||
long maxWaitTime = indexDefinition.getMaxWaitTime();
|
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", level,
|
||||||
TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit();
|
30L, TimeUnit.SECONDS);
|
||||||
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
|
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,8 +195,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
if (indexMetadata == null) {
|
if (indexMetadata == null) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
return indexMetadata.stream().map(im -> im.getIndex().getName())
|
return indexMetadata.stream()
|
||||||
.sorted().collect(Collectors.toList());
|
.map(im -> im.getIndex().getName())
|
||||||
|
.sorted() // important
|
||||||
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -403,22 +404,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
String index = indexDefinition.getFullIndexName();
|
logger.info("force merge of " + indexDefinition);
|
||||||
ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
|
ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
|
||||||
forceMergeRequest.indices(index);
|
forceMergeRequest.indices(indexDefinition.getFullIndexName());
|
||||||
try {
|
ForceMergeResponse forceMergeResponse = client.execute(ForceMergeAction.INSTANCE, forceMergeRequest)
|
||||||
client.execute(ForceMergeAction.INSTANCE, forceMergeRequest)
|
.actionGet();
|
||||||
.get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
|
if (forceMergeResponse.getFailedShards() > 0) {
|
||||||
return true;
|
throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards());
|
||||||
} catch (TimeoutException e) {
|
|
||||||
logger.error("timeout");
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
logger.error(e.getMessage(), e);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
logger.error(e.getMessage(), e);
|
|
||||||
}
|
}
|
||||||
return false;
|
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -7,10 +7,6 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||||
|
@ -30,8 +26,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPoolInfo;
|
|
||||||
import org.xbib.elx.api.BasicClient;
|
import org.xbib.elx.api.BasicClient;
|
||||||
import org.xbib.elx.api.IndexDefinition;
|
import org.xbib.elx.api.IndexDefinition;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -123,6 +117,7 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
@Override
|
@Override
|
||||||
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
|
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit);
|
||||||
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
|
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
|
||||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||||
|
|
|
@ -118,8 +118,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly.
|
// we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly.
|
||||||
logger.info("waiting for GREEN after index {} was created", index);
|
waitForCluster("GREEN", 300L, TimeUnit.SECONDS);
|
||||||
waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -102,7 +102,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
int interval = indexDefinition.getStartBulkRefreshSeconds();
|
int interval = indexDefinition.getStartBulkRefreshSeconds();
|
||||||
if (interval != 0) {
|
if (interval != 0) {
|
||||||
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
|
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
|
||||||
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
bulkClient.updateIndexSetting(indexName, "refresh_interval",
|
||||||
|
interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
|
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
|
||||||
}
|
}
|
||||||
|
@ -116,7 +117,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
|
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
|
||||||
if (interval != 0) {
|
if (interval != 0) {
|
||||||
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
|
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
|
||||||
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
bulkClient.updateIndexSetting(indexName, "refresh_interval",
|
||||||
|
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
|
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ class AliasTest {
|
||||||
long t1 = (System.nanoTime() - t0) / 1000000;
|
long t1 = (System.nanoTime() - t0) / 1000000;
|
||||||
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
|
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
|
||||||
assertTrue(t1 >= 0);
|
assertTrue(t1 >= 0);
|
||||||
client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest());
|
client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForGreenStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -76,7 +76,11 @@ class AliasTest {
|
||||||
indexRequest = new CreateIndexRequest("test20160103");
|
indexRequest = new CreateIndexRequest("test20160103");
|
||||||
client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet();
|
client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet();
|
||||||
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
||||||
String[] indices = new String[] { "test20160101", "test20160102", "test20160103" };
|
String[] indices = {
|
||||||
|
"test20160101",
|
||||||
|
"test20160102",
|
||||||
|
"test20160103"
|
||||||
|
};
|
||||||
String[] aliases = new String[] { alias };
|
String[] aliases = new String[] { alias };
|
||||||
IndicesAliasesRequest.AliasActions aliasAction =
|
IndicesAliasesRequest.AliasActions aliasAction =
|
||||||
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
|
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
|
||||||
|
@ -89,6 +93,7 @@ class AliasTest {
|
||||||
GetAliasesResponse getAliasesResponse =
|
GetAliasesResponse getAliasesResponse =
|
||||||
client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
|
client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
|
||||||
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
|
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
|
||||||
|
// reverse order
|
||||||
Set<String> result = new TreeSet<>(Collections.reverseOrder());
|
Set<String> result = new TreeSet<>(Collections.reverseOrder());
|
||||||
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
|
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
|
||||||
Matcher m = pattern.matcher(indexName.value);
|
Matcher m = pattern.matcher(indexName.value);
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
group = org.xbib
|
group = org.xbib
|
||||||
name = elx
|
name = elx
|
||||||
version = 7.10.2.1
|
version = 7.10.2.2
|
||||||
|
|
||||||
gradle.wrapper.version = 6.6.1
|
gradle.wrapper.version = 6.6.1
|
||||||
xbib-metrics.version = 2.1.0
|
xbib-metrics.version = 2.1.0
|
||||||
|
|
Loading…
Reference in a new issue