From 37e9ae1ac9c3eda77393049d6306b9e5663da981 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Thu, 6 May 2021 18:18:17 +0200 Subject: [PATCH] align with es7102 branch --- .../org/xbib/elx/common/AbstractAdminClient.java | 16 +++++++++++++++- .../org/xbib/elx/common/AbstractBasicClient.java | 15 ++++++++++----- .../xbib/elx/common/AbstractSearchClient.java | 16 +++++----------- .../org/xbib/elx/common/DefaultBulkListener.java | 4 +--- .../java/org/xbib/elx/node/NodeBulkClient.java | 3 +-- .../java/org/xbib/elx/node/NodeClientHelper.java | 6 +++--- .../java/org/xbib/elx/node/test/SmokeTest.java | 2 -- .../xbib/elx/transport/test/BulkClientTest.java | 4 ++-- .../xbib/elx/transport/test/IndexShiftTest.java | 2 +- gradle.properties | 4 ++-- 10 files changed, 40 insertions(+), 32 deletions(-) diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 97fd478..e9a2789 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -127,9 +127,23 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return this; } logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount()); + int currentReplicaLevel = getReplicaLevel(indexDefinition); + logger.info("current replica level for " + indexDefinition + " is " + currentReplicaLevel); + if (currentReplicaLevel < indexDefinition.getReplicaCount()) { + putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "5", 30L, TimeUnit.SECONDS); + putClusterSetting("indices.recovery.max_bytes_per_sec", "2gb", 30L, TimeUnit.SECONDS); + logger.info("recovery boost activated"); + } updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS); waitForHealthyCluster(); + if (currentReplicaLevel < indexDefinition.getReplicaCount()) { + // 2 = default setting + putClusterSetting("cluster.routing.allocation.node_concurrent_recoveries", "2", 30L, TimeUnit.SECONDS); + // 20m = default value + putClusterSetting("indices.recovery.max_bytes_per_sec", "40mb", 30L, TimeUnit.SECONDS); + logger.info("recovery boost deactivated"); + } return this; } @@ -243,7 +257,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements newAliases.add(index); } // move existing aliases - if (oldAliasMap != null) { + if (oldIndex.isPresent() && oldAliasMap != null) { for (Map.Entry entry : oldAliasMap.entrySet()) { String alias = entry.getKey(); String filter = entry.getValue(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index a2694d0..25e8699 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -28,7 +29,6 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolInfo; @@ -86,10 +86,10 @@ public abstract class AbstractBasicClient implements BasicClient { public void close() throws IOException { ensureClientIsPresent(); if (closed.compareAndSet(false, true)) { - closeClient(settings); if (executorService != null) { - executorService.shutdown(); + executorService.shutdownNow(); } + closeClient(settings); } } @@ -126,7 +126,11 @@ public abstract class AbstractBasicClient implements BasicClient { updateSettingsBuilder.put(key, value.toString()); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); - client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); + ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = + client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); + if (clusterUpdateSettingsResponse.isAcknowledged()) { + logger.info("cluster update of " + key + " to " + value + " acknowledged"); + } } @Override @@ -147,7 +151,8 @@ public abstract class AbstractBasicClient implements BasicClient { .waitForStatus(status); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { + logger.info("got cluster status " + healthResponse.getStatus().name()); + if (healthResponse.isTimedOut()) { String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name(); logger.error(message); throw new IllegalStateException(message); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 6c790f1..f77c7d3 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -181,20 +181,14 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement searchMetric.getQueries().inc(); searchMetric.markTotalQueries(1); } - if (initialSearchResponse.getFailedShards() > 0) { - if (searchMetric != null) { + if (searchMetric != null) { + if (initialSearchResponse.getFailedShards() > 0) { searchMetric.getFailedQueries().inc(); - } - } else if (initialSearchResponse.isTimedOut()) { - if (searchMetric != null) { + } else if (initialSearchResponse.isTimedOut()) { searchMetric.getTimeoutQueries().inc(); - } - } else if (initialSearchResponse.getHits().getTotalHits() == 0) { - if (searchMetric != null) { + } else if (initialSearchResponse.getHits().getTotalHits() == 0) { searchMetric.getEmptyQueries().inc(); - } - } else { - if (searchMetric != null) { + } else { searchMetric.getSucceededQueries().inc(); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 493a195..68d3170 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -11,7 +11,6 @@ import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; public class DefaultBulkListener implements BulkListener { @@ -28,9 +27,8 @@ public class DefaultBulkListener implements BulkListener { public DefaultBulkListener(DefaultBulkProcessor bulkProcessor, Settings settings) { this.bulkProcessor = bulkProcessor; - boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), + this.failOnError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), Parameters.BULK_FAIL_ON_ERROR.getBoolean()); - this.failOnError = failOnBulkError; if (settings.getAsBoolean(Parameters.BULK_METRIC_ENABLED.getName(), Parameters.BULK_METRIC_ENABLED.getBoolean())) { this.bulkMetric = new DefaultBulkMetric(bulkProcessor, bulkProcessor.getScheduler(), settings); diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index 566d99e..3e74426 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -3,7 +3,6 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractBulkClient; -import java.io.IOException; public class NodeBulkClient extends AbstractBulkClient { @@ -20,7 +19,7 @@ public class NodeBulkClient extends AbstractBulkClient { } @Override - public void closeClient(Settings settings) throws IOException { + public void closeClient(Settings settings) { helper.closeClient(settings); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index d52f11b..b059f67 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -31,14 +31,14 @@ public class NodeClientHelper { } String clusterName = settings.get("cluster.name", "elasticsearch"); Node node = nodeMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings)); - return node != null ? node.client() : null; + return node.client(); } public void closeClient(Settings settings) { String clusterName = settings.get("cluster.name", "elasticsearch"); - Node node = nodeMap.remove(settings.get("cluster.name")); + Node node = nodeMap.remove(clusterName); if (node != null) { - logger.debug("closing node..."); + logger.debug("closing..."); node.close(); } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 9f4f745..9862aef 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -50,7 +50,6 @@ class SmokeTest { indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.checkMapping(indexDefinition); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); @@ -64,7 +63,6 @@ class SmokeTest { bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); adminClient.updateReplicaLevel(indexDefinition); - assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 624cc70..505bb11 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -123,11 +123,11 @@ class BulkClientTest { logger.error("latch timeout!"); } bulkClient.stopBulk(indexDefinition); + bulkClient.refreshIndex(indexDefinition); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) { assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); } - bulkClient.refreshIndex(indexDefinition); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); if (bulkClient.getBulkProcessor().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index cb9723e..8f87880 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -46,7 +46,7 @@ class IndexShiftTest { .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getClientSettings()) .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { diff --git a/gradle.properties b/gradle.properties index ebaf8e9..aa63d03 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,13 +1,13 @@ group = org.xbib name = elx -version = 2.2.1.48 +version = 2.2.1.49 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0 xbib-time.version = 2.1.0 xbib-guice.version = 4.4.2 xbib-guava.version = 28.1 -xbib-netty-http.version = 4.1.63.2 +xbib-netty-http.version = 4.1.63.3 elasticsearch.version = 2.2.1 jackson.version = 2.11.4 jna.version = 5.5.0