From c4562bb681bb38ac2419fc2d4ecbd48eed482faf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Sat, 23 May 2020 22:51:03 +0200 Subject: [PATCH] fix locking in open/close logic, allow more than one clients if connected to different clusters --- .../xbib/elx/common/AbstractAdminClient.java | 16 ++-- .../xbib/elx/common/AbstractBulkClient.java | 17 +++-- .../xbib/elx/common/AbstractNativeClient.java | 28 ++++--- .../org/xbib/elx/common/MockAdminClient.java | 2 +- .../org/xbib/elx/common/MockBulkClient.java | 2 +- .../org/xbib/elx/common/MockSearchClient.java | 2 +- .../org/xbib/elx/node/NodeAdminClient.java | 8 +- .../org/xbib/elx/node/NodeBulkClient.java | 7 +- .../org/xbib/elx/node/NodeClientHelper.java | 66 ++++++++-------- .../org/xbib/elx/node/NodeSearchClient.java | 7 +- .../elx/transport/TransportAdminClient.java | 4 +- .../elx/transport/TransportBulkClient.java | 6 +- .../elx/transport/TransportClientHelper.java | 76 +++++++++---------- .../elx/transport/TransportSearchClient.java | 4 +- 14 files changed, 124 insertions(+), 121 deletions(-) diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index b97cc8e..a1c9f18 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.AliasOrIndex; @@ -53,8 +52,6 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.xbib.elx.api.AdminClient; -import org.xbib.elx.api.BulkController; -import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.IndexAliasAdder; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; @@ -70,11 +67,20 @@ import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Matcher; diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 1b03d02..a384554 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class AbstractBulkClient extends AbstractNativeClient implements BulkClient { @@ -37,17 +38,19 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements private BulkController bulkController; + private final AtomicBoolean closed = new AtomicBoolean(true); + @Override public void init(Settings settings) throws IOException { - logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); - super.init(settings); - if (bulkMetric == null) { + if (closed.compareAndSet(true, false)) { + super.init(settings); + logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); bulkMetric = new DefaultBulkMetric(); bulkMetric.init(settings); - } - if (bulkController == null) { bulkController = new DefaultBulkController(this, bulkMetric); bulkController.init(settings); + } else { + logger.log(Level.WARN, "not initializing"); } } @@ -70,8 +73,8 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public void close() throws IOException { - ensureClientIsPresent(); if (closed.compareAndSet(false, true)) { + ensureClientIsPresent(); if (bulkMetric != null) { logger.info("closing bulk metric"); bulkMetric.close(); @@ -80,7 +83,7 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements logger.info("closing bulk controller"); bulkController.close(); } - closeClient(); + closeClient(settings); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java index 36c65ab..71de04c 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java @@ -40,14 +40,17 @@ public abstract class AbstractNativeClient implements NativeClient { protected ElasticsearchClient client; - protected final AtomicBoolean closed; + protected Settings settings; - protected AbstractNativeClient() { + private final AtomicBoolean closed; + + public AbstractNativeClient() { closed = new AtomicBoolean(false); } @Override public void setClient(ElasticsearchClient client) { + logger.log(Level.INFO, "setting client = " + client); this.client = client; } @@ -56,16 +59,14 @@ public abstract class AbstractNativeClient implements NativeClient { return client; } - protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; - - protected abstract void closeClient() throws IOException; - - @Override public void init(Settings settings) throws IOException { - logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); - if (client == null) { - client = createClient(settings); + if (closed.compareAndSet(false, true)) { + logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); + this.settings = settings; + setClient(createClient(settings)); + } else { + logger.log(Level.WARN, "not initializing"); } } @@ -166,10 +167,14 @@ public abstract class AbstractNativeClient implements NativeClient { public void close() throws IOException { ensureClientIsPresent(); if (closed.compareAndSet(false, true)) { - closeClient(); + closeClient(settings); } } + protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; + + protected abstract void closeClient(Settings settings) throws IOException; + protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { ensureClientIsPresent(); if (index == null) { @@ -217,5 +222,4 @@ public abstract class AbstractNativeClient implements NativeClient { throw new IllegalArgumentException("unknown time unit: " + timeUnit); } } - } diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index bbded5e..ebc1450 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -25,7 +25,7 @@ public class MockAdminClient extends AbstractAdminClient { } @Override - protected void closeClient() { + protected void closeClient(Settings settings) { } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index 6e7e10a..5494d90 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -36,7 +36,7 @@ public class MockBulkClient extends AbstractBulkClient { } @Override - protected void closeClient() { + protected void closeClient(Settings settings) { } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java index d8d56cc..9d182dc 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java @@ -34,7 +34,7 @@ public class MockSearchClient extends AbstractSearchClient { } @Override - protected void closeClient() { + protected void closeClient(Settings settings) { } @Override diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java index 3bf8740..97a5d24 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java @@ -4,8 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractAdminClient; -import java.io.IOException; - public class NodeAdminClient extends AbstractAdminClient { private final NodeClientHelper helper; @@ -15,12 +13,12 @@ public class NodeAdminClient extends AbstractAdminClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } @Override - public void closeClient() { - helper.closeClient(); + public void closeClient(Settings settings) { + helper.closeClient(settings); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index 3b285e4..019fab1 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -3,7 +3,6 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractBulkClient; -import java.io.IOException; public class NodeBulkClient extends AbstractBulkClient { @@ -14,12 +13,12 @@ public class NodeBulkClient extends AbstractBulkClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } @Override - public void closeClient() { - helper.closeClient(); + public void closeClient(Settings settings) { + helper.closeClient(settings); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index eebb1a9..6f6aef0 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -10,18 +10,18 @@ import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; public class NodeClientHelper { private static final Logger logger = LogManager.getLogger(NodeClientHelper.class.getName()); - private static Node node; - - private static ElasticsearchClient client; - private static Object configurationObject; - private final Object lock = new Object(); + private static Node node; + + private static final Map clientMap = new HashMap<>(); public ElasticsearchClient createClient(Settings settings, Object object) { if (configurationObject == null) { @@ -30,40 +30,38 @@ public class NodeClientHelper { if (configurationObject instanceof ElasticsearchClient) { return (ElasticsearchClient) configurationObject; } - if (client == null) { - synchronized (lock) { - String version = System.getProperty("os.name") - + " " + System.getProperty("java.vm.name") - + " " + System.getProperty("java.vm.vendor") - + " " + System.getProperty("java.runtime.version") - + " " + System.getProperty("java.vm.version"); - Settings effectiveSettings = Settings.builder().put(settings) - .put("node.client", true) - .put("node.master", false) - .put("node.data", false) - .build(); - logger.info("creating node client on {} with effective settings {}", - version, effectiveSettings.getAsMap()); - Collection> plugins = Collections.emptyList(); - node = new BulkNode(new Environment(effectiveSettings), plugins); - node.start(); - client = node.client(); - } - } - return client; + return clientMap.computeIfAbsent(settings.get("cluster.name"), + key -> innerCreateClient(settings)); } - public void closeClient() { - synchronized (lock) { - if (client != null) { - logger.debug("closing node..."); - node.close(); - node = null; - client = null; - } + public void closeClient(Settings settings) { + ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); + if (client != null) { + logger.debug("closing node..."); + node.close(); + node = null; } } + private ElasticsearchClient innerCreateClient(Settings settings) { + String version = System.getProperty("os.name") + + " " + System.getProperty("java.vm.name") + + " " + System.getProperty("java.vm.vendor") + + " " + System.getProperty("java.runtime.version") + + " " + System.getProperty("java.vm.version"); + Settings effectiveSettings = Settings.builder().put(settings) + .put("node.client", true) + .put("node.master", false) + .put("node.data", false) + .build(); + logger.info("creating node client on {} with effective settings {}", + version, effectiveSettings.getAsMap()); + Collection> plugins = Collections.emptyList(); + node = new BulkNode(new Environment(effectiveSettings), plugins); + node.start(); + return node.client(); + } + private static class BulkNode extends Node { BulkNode(Environment env, Collection> classpathPlugins) { diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index 53c70ac..9248ac7 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -3,7 +3,6 @@ package org.xbib.elx.node; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractSearchClient; -import java.io.IOException; public class NodeSearchClient extends AbstractSearchClient { @@ -14,12 +13,12 @@ public class NodeSearchClient extends AbstractSearchClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } @Override - public void closeClient() { - helper.closeClient(); + public void closeClient(Settings settings) { + helper.closeClient(settings); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index 7ceb074..f06a5d2 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -30,7 +30,7 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - public void closeClient() { - helper.closeClient(); + public void closeClient(Settings settings) { + helper.closeClient(settings); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index 7b1d58b..a10b22b 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -18,7 +18,7 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } @@ -29,7 +29,7 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public void closeClient() { - helper.closeClient(); + public void closeClient(Settings settings) { + helper.closeClient(settings); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 3802c67..cc8531b 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -21,17 +21,17 @@ import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class TransportClientHelper { private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName()); - private static ElasticsearchClient client; - private static Object configurationObject; - private final Object lock = new Object(); + private static final Map clientMap = new HashMap<>(); public ElasticsearchClient createClient(Settings settings, Object object) { if (configurationObject == null && object != null) { @@ -40,46 +40,17 @@ public class TransportClientHelper { if (configurationObject instanceof ElasticsearchClient) { return (ElasticsearchClient) configurationObject; } - if (client == null) { - synchronized (lock) { - String systemIdentifier = System.getProperty("os.name") - + " " + System.getProperty("java.vm.name") - + " " + System.getProperty("java.vm.vendor") - + " " + System.getProperty("java.vm.version") - + " Elasticsearch " + Version.CURRENT.toString(); - Settings effectiveSettings = Settings.builder() - // for thread pool size - .put("processors", - settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) - .put("client.transport.sniff", false) // do not sniff - .put("client.transport.nodes_sampler_interval", "1m") // do not ping - .put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect - .put("client.transport.ignore_cluster_name", true) // connect to any cluster - // custom settings may override defaults - .put(settings) - .build(); - logger.info("creating transport client on {} with custom settings {} and effective settings {}", - systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap()); - - // we need to disable dead lock check because we may have mixed node/transport clients - DefaultChannelFuture.setUseDeadLockChecker(false); - client = TransportClient.builder().settings(effectiveSettings).build(); - } - } - return client; + return clientMap.computeIfAbsent(settings.get("cluster.name"), + key -> innerCreateClient(settings)); } - public void closeClient() { - synchronized (lock) { - if (client != null) { - if (client instanceof Client) { - ((Client) client).close(); - } - if (client != null) { - client.threadPool().shutdownNow(); - } - client = null; + public void closeClient(Settings settings) { + ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); + if (client != null) { + if (client instanceof Client) { + ((Client) client).close(); } + client.threadPool().shutdownNow(); } } @@ -143,4 +114,29 @@ public class TransportClientHelper { transportClient.addTransportAddress(discoveryNode.getAddress()); } } + + private ElasticsearchClient innerCreateClient(Settings settings) { + String systemIdentifier = System.getProperty("os.name") + + " " + System.getProperty("java.vm.name") + + " " + System.getProperty("java.vm.vendor") + + " " + System.getProperty("java.vm.version") + + " Elasticsearch " + Version.CURRENT.toString(); + Settings effectiveSettings = Settings.builder() + // for thread pool size + .put("processors", + settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) + .put("client.transport.sniff", false) // do not sniff + .put("client.transport.nodes_sampler_interval", "1m") // do not ping + .put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect + .put("client.transport.ignore_cluster_name", true) // connect to any cluster + // custom settings may override defaults + .put(settings) + .build(); + logger.info("creating transport client on {} with custom settings {} and effective settings {}", + systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap()); + + // we need to disable dead lock check because we may have mixed node/transport clients + DefaultChannelFuture.setUseDeadLockChecker(false); + return TransportClient.builder().settings(effectiveSettings).build(); + } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index dbd60e0..85636f0 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -29,7 +29,7 @@ public class TransportSearchClient extends AbstractSearchClient { } @Override - public void closeClient() { - helper.closeClient(); + public void closeClient(Settings settings) { + helper.closeClient(settings); } }