Merge branch 'es221' of alkmene:joerg/elx into es221

This commit is contained in:
Jörg Prante 2021-05-14 15:17:05 +02:00
commit 7007ddb465
9 changed files with 38 additions and 30 deletions

View file

@ -127,9 +127,23 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return this;
}
logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount());
int currentReplicaLevel = getReplicaLevel(indexDefinition);
logger.info("current replica level for " + indexDefinition + " is " + currentReplicaLevel);
if (currentReplicaLevel < indexDefinition.getReplicaCount()) {
putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "5", 30L, TimeUnit.SECONDS);
putClusterSetting("indices.recovery.max_bytes_per_sec", "2gb", 30L, TimeUnit.SECONDS);
logger.info("recovery boost activated");
}
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas",
indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS);
waitForHealthyCluster();
if (currentReplicaLevel < indexDefinition.getReplicaCount()) {
// 2 = default setting
putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "2", 30L, TimeUnit.SECONDS);
// 20m = default value
putClusterSetting("indices.recovery.max_bytes_per_sec", "40mb", 30L, TimeUnit.SECONDS);
logger.info("recovery boost deactivated");
}
return this;
}
@ -243,7 +257,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
newAliases.add(index);
}
// move existing aliases
if (oldAliasMap != null) {
if (oldIndex.isPresent() && oldAliasMap != null) {
for (Map.Entry<String, String> entry : oldAliasMap.entrySet()) {
String alias = entry.getKey();
String filter = entry.getValue();

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -28,7 +29,6 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolInfo;
@ -86,10 +86,10 @@ public abstract class AbstractBasicClient implements BasicClient {
public void close() throws IOException {
ensureClientIsPresent();
if (closed.compareAndSet(false, true)) {
closeClient(settings);
if (executorService != null) {
executorService.shutdown();
executorService.shutdownNow();
}
closeClient(settings);
}
}
@ -126,7 +126,11 @@ public abstract class AbstractBasicClient implements BasicClient {
updateSettingsBuilder.put(key, value.toString());
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
ClusterUpdateSettingsResponse clusterUpdateSettingsResponse =
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
if (clusterUpdateSettingsResponse.isAcknowledged()) {
logger.info("cluster update of " + key + " to " + value + " acknowledged");
}
}
@Override
@ -147,7 +151,8 @@ public abstract class AbstractBasicClient implements BasicClient {
.waitForStatus(status);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
logger.info("got cluster status " + healthResponse.getStatus().name());
if (healthResponse.isTimedOut()) {
String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name();
logger.error(message);
throw new IllegalStateException(message);

View file

@ -181,20 +181,14 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
}
if (initialSearchResponse.getFailedShards() > 0) {
if (searchMetric != null) {
if (searchMetric != null) {
if (initialSearchResponse.getFailedShards() > 0) {
searchMetric.getFailedQueries().inc();
}
} else if (initialSearchResponse.isTimedOut()) {
if (searchMetric != null) {
} else if (initialSearchResponse.isTimedOut()) {
searchMetric.getTimeoutQueries().inc();
}
} else if (initialSearchResponse.getHits().getTotalHits() == 0) {
if (searchMetric != null) {
} else if (initialSearchResponse.getHits().getTotalHits() == 0) {
searchMetric.getEmptyQueries().inc();
}
} else {
if (searchMetric != null) {
} else {
searchMetric.getSucceededQueries().inc();
}
}

View file

@ -11,7 +11,6 @@ import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
public class DefaultBulkListener implements BulkListener {
@ -28,9 +27,8 @@ public class DefaultBulkListener implements BulkListener {
public DefaultBulkListener(DefaultBulkProcessor bulkProcessor,
Settings settings) {
this.bulkProcessor = bulkProcessor;
boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(),
this.failOnError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(),
Parameters.BULK_FAIL_ON_ERROR.getBoolean());
this.failOnError = failOnBulkError;
if (settings.getAsBoolean(Parameters.BULK_METRIC_ENABLED.getName(),
Parameters.BULK_METRIC_ENABLED.getBoolean())) {
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, bulkProcessor.getScheduler(), settings);

View file

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractBulkClient;
import java.io.IOException;
public class NodeBulkClient extends AbstractBulkClient {
@ -20,7 +19,7 @@ public class NodeBulkClient extends AbstractBulkClient {
}
@Override
public void closeClient(Settings settings) throws IOException {
public void closeClient(Settings settings) {
helper.closeClient(settings);
}
}

View file

@ -31,14 +31,14 @@ public class NodeClientHelper {
}
String clusterName = settings.get("cluster.name", "elasticsearch");
Node node = nodeMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
return node != null ? node.client() : null;
return node.client();
}
public void closeClient(Settings settings) {
String clusterName = settings.get("cluster.name", "elasticsearch");
Node node = nodeMap.remove(settings.get("cluster.name"));
Node node = nodeMap.remove(clusterName);
if (node != null) {
logger.debug("closing node...");
logger.debug("closing...");
node.close();
}
}

View file

@ -50,7 +50,6 @@ class SmokeTest {
indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
bulkClient.flush();
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.checkMapping(indexDefinition);
bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}");
@ -64,7 +63,6 @@ class SmokeTest {
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition);
assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());

View file

@ -123,11 +123,11 @@ class BulkClientTest {
logger.error("latch timeout!");
}
bulkClient.stopBulk(indexDefinition);
bulkClient.refreshIndex(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
}
bulkClient.refreshIndex(indexDefinition);
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
}

View file

@ -46,7 +46,7 @@ class IndexShiftTest {
.setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getClientSettings())
.build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_shift");
bulkClient.newIndex(indexDefinition);
for (int i = 0; i < 1; i++) {