From 66df1155792a716a1c7bd57b572b3d5ea9509ea7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Sat, 23 May 2020 21:42:19 +0200 Subject: [PATCH] add helper classes to share same Elasticsearch low level client --- .../org/xbib/elx/node/NodeAdminClient.java | 54 ++----------- .../org/xbib/elx/node/NodeBulkClient.java | 59 ++------------ .../org/xbib/elx/node/NodeClientHelper.java | 73 +++++++++++++++++ .../org/xbib/elx/node/NodeSearchClient.java | 59 ++------------ .../elx/transport/TransportAdminClient.java | 18 ++--- .../elx/transport/TransportBulkClient.java | 18 ++--- .../elx/transport/TransportClientHelper.java | 79 +++++++++++++------ .../elx/transport/TransportSearchClient.java | 18 ++--- gradle.properties | 2 +- 9 files changed, 173 insertions(+), 207 deletions(-) create mode 100644 elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java index e1715f9..3bf8740 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java @@ -1,64 +1,26 @@ package org.xbib.elx.node; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; -import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.Plugin; import org.xbib.elx.common.AbstractAdminClient; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; public class NodeAdminClient extends AbstractAdminClient { - private static final Logger logger = LogManager.getLogger(NodeAdminClient.class.getName()); + private final NodeClientHelper helper; - private Node node; - - @Override - protected ElasticsearchClient createClient(Settings settings) throws IOException { - if (settings != null) { - String version = System.getProperty("os.name") - + " " + System.getProperty("java.vm.name") - + " " + System.getProperty("java.vm.vendor") - + " " + System.getProperty("java.runtime.version") - + " " + System.getProperty("java.vm.version"); - Settings effectiveSettings = Settings.builder().put(settings) - .put("node.client", true) - .put("node.master", false) - .put("node.data", false) - .build(); - logger.info("creating node client on {} with effective settings {}", - version, effectiveSettings.getAsMap()); - Collection> plugins = Collections.emptyList(); - this.node = new BulkNode(new Environment(effectiveSettings), plugins); - try { - node.start(); - } catch (Exception e) { - throw new IOException(e); - } - return node.client(); - } - return null; + public NodeAdminClient() { + this.helper = new NodeClientHelper(); } @Override - protected void closeClient() { - if (node != null) { - logger.debug("closing node..."); - node.close(); - } + public ElasticsearchClient createClient(Settings settings) throws IOException { + return helper.createClient(settings, null); } - private static class BulkNode extends Node { - - BulkNode(Environment env, Collection> classpathPlugins) { - super(env, Version.CURRENT, classpathPlugins); - } + @Override + public void closeClient() { + helper.closeClient(); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index 6b01fcc..3b285e4 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -1,68 +1,25 @@ package org.xbib.elx.node; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.env.Environment; -import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.Plugin; import org.xbib.elx.common.AbstractBulkClient; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; public class NodeBulkClient extends AbstractBulkClient { - private static final Logger logger = LogManager.getLogger(NodeBulkClient.class.getName()); + private final NodeClientHelper helper; - private Node node; - - @Override - protected ElasticsearchClient createClient(Settings settings) throws IOException { - if (settings == null) { - return null; - } - String version = System.getProperty("os.name") - + " " + System.getProperty("java.vm.name") - + " " + System.getProperty("java.vm.vendor") - + " " + System.getProperty("java.runtime.version") - + " " + System.getProperty("java.vm.version"); - Settings effectiveSettings = Settings.builder().put(settings) - .put("node.client", true) - .put("node.master", false) - .put("node.data", false) - .build(); - XContentBuilder builder = XContentFactory.jsonBuilder(); - effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true"))); - logger.info("creating node client on {} with effective settings {}", - version, builder.string()); - Collection> plugins = Collections.emptyList(); - this.node = new BulkNode(new Environment(effectiveSettings), plugins); - try { - node.start(); - } catch (Exception e) { - throw new IOException(e); - } - return node.client(); + public NodeBulkClient() { + this.helper = new NodeClientHelper(); } @Override - protected void closeClient() throws IOException { - if (node != null) { - logger.debug("closing node client"); - node.close(); - } + public ElasticsearchClient createClient(Settings settings) throws IOException { + return helper.createClient(settings, null); } - private static class BulkNode extends Node { - - BulkNode(Environment env, Collection> classpathPlugins) { - super(env, Version.CURRENT, classpathPlugins); - } + @Override + public void closeClient() { + helper.closeClient(); } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java new file mode 100644 index 0000000..eebb1a9 --- /dev/null +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -0,0 +1,73 @@ +package org.xbib.elx.node; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import java.util.Collection; +import java.util.Collections; + +public class NodeClientHelper { + + private static final Logger logger = LogManager.getLogger(NodeClientHelper.class.getName()); + + private static Node node; + + private static ElasticsearchClient client; + + private static Object configurationObject; + + private final Object lock = new Object(); + + public ElasticsearchClient createClient(Settings settings, Object object) { + if (configurationObject == null) { + configurationObject = object; + } + if (configurationObject instanceof ElasticsearchClient) { + return (ElasticsearchClient) configurationObject; + } + if (client == null) { + synchronized (lock) { + String version = System.getProperty("os.name") + + " " + System.getProperty("java.vm.name") + + " " + System.getProperty("java.vm.vendor") + + " " + System.getProperty("java.runtime.version") + + " " + System.getProperty("java.vm.version"); + Settings effectiveSettings = Settings.builder().put(settings) + .put("node.client", true) + .put("node.master", false) + .put("node.data", false) + .build(); + logger.info("creating node client on {} with effective settings {}", + version, effectiveSettings.getAsMap()); + Collection> plugins = Collections.emptyList(); + node = new BulkNode(new Environment(effectiveSettings), plugins); + node.start(); + client = node.client(); + } + } + return client; + } + + public void closeClient() { + synchronized (lock) { + if (client != null) { + logger.debug("closing node..."); + node.close(); + node = null; + client = null; + } + } + } + + private static class BulkNode extends Node { + + BulkNode(Environment env, Collection> classpathPlugins) { + super(env, Version.CURRENT, classpathPlugins); + } + } +} diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index e6fa89a..53c70ac 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -1,68 +1,25 @@ package org.xbib.elx.node; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.env.Environment; -import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.Plugin; import org.xbib.elx.common.AbstractSearchClient; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; public class NodeSearchClient extends AbstractSearchClient { - private static final Logger logger = LogManager.getLogger(NodeSearchClient.class.getName()); + private final NodeClientHelper helper; - private Node node; - - @Override - protected ElasticsearchClient createClient(Settings settings) throws IOException { - if (settings == null) { - return null; - } - String version = System.getProperty("os.name") - + " " + System.getProperty("java.vm.name") - + " " + System.getProperty("java.vm.vendor") - + " " + System.getProperty("java.runtime.version") - + " " + System.getProperty("java.vm.version"); - Settings effectiveSettings = Settings.builder().put(settings) - .put("node.client", true) - .put("node.master", false) - .put("node.data", false) - .build(); - XContentBuilder builder = XContentFactory.jsonBuilder(); - effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true"))); - logger.info("creating node client on {} with effective settings {}", - version, builder.string()); - Collection> plugins = Collections.emptyList(); - this.node = new BulkNode(new Environment(effectiveSettings), plugins); - try { - node.start(); - } catch (Exception e) { - throw new IOException(e); - } - return node.client(); + public NodeSearchClient() { + this.helper = new NodeClientHelper(); } @Override - protected void closeClient() throws IOException { - if (node != null) { - logger.debug("closing node client"); - node.close(); - } + public ElasticsearchClient createClient(Settings settings) throws IOException { + return helper.createClient(settings, null); } - private static class BulkNode extends Node { - - BulkNode(Environment env, Collection> classpathPlugins) { - super(env, Version.CURRENT, classpathPlugins); - } + @Override + public void closeClient() { + helper.closeClient(); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index 3987f63..7ceb074 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -19,17 +19,8 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - protected ElasticsearchClient createClient(Settings settings) throws IOException { - return helper.createClient(settings); - } - - @Override - protected void closeClient() { - if (getClient() != null) { - TransportClient client = (TransportClient) getClient(); - client.close(); - client.threadPool().shutdown(); - } + public ElasticsearchClient createClient(Settings settings) { + return helper.createClient(settings, null); } @Override @@ -37,4 +28,9 @@ public class TransportAdminClient extends AbstractAdminClient { super.init(settings); helper.init((TransportClient) getClient(), settings); } + + @Override + public void closeClient() { + helper.closeClient(); + } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index ebc8621..7b1d58b 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -18,17 +18,8 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - protected ElasticsearchClient createClient(Settings settings) throws IOException { - return helper.createClient(settings); - } - - @Override - protected void closeClient() { - if (getClient() != null) { - TransportClient client = (TransportClient) getClient(); - client.close(); - client.threadPool().shutdown(); - } + public ElasticsearchClient createClient(Settings settings) throws IOException { + return helper.createClient(settings, null); } @Override @@ -36,4 +27,9 @@ public class TransportBulkClient extends AbstractBulkClient { super.init(settings); helper.init((TransportClient) getClient(), settings); } + + @Override + public void closeClient() { + helper.closeClient(); + } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 53147c2..3802c67 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -6,6 +6,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.TransportClient; @@ -24,34 +25,62 @@ import java.util.List; public class TransportClientHelper { - private static final Logger logger = LogManager.getLogger(TransportAdminClient.class.getName()); + private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName()); - protected ElasticsearchClient createClient(Settings settings) { - if (settings != null) { - String systemIdentifier = System.getProperty("os.name") - + " " + System.getProperty("java.vm.name") - + " " + System.getProperty("java.vm.vendor") - + " " + System.getProperty("java.vm.version") - + " Elasticsearch " + Version.CURRENT.toString(); - Settings effectiveSettings = Settings.builder() - // for thread pool size - .put("processors", - settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) - .put("client.transport.sniff", false) // do not sniff - .put("client.transport.nodes_sampler_interval", "1m") // do not ping - .put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect - .put("client.transport.ignore_cluster_name", true) // connect to any cluster - // custom settings may override defaults - .put(settings) - .build(); - logger.info("creating transport client on {} with custom settings {} and effective settings {}", - systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap()); + private static ElasticsearchClient client; - // we need to disable dead lock check because we may have mixed node/transport clients - DefaultChannelFuture.setUseDeadLockChecker(false); - return TransportClient.builder().settings(effectiveSettings).build(); + private static Object configurationObject; + + private final Object lock = new Object(); + + public ElasticsearchClient createClient(Settings settings, Object object) { + if (configurationObject == null && object != null) { + configurationObject = object; + } + if (configurationObject instanceof ElasticsearchClient) { + return (ElasticsearchClient) configurationObject; + } + if (client == null) { + synchronized (lock) { + String systemIdentifier = System.getProperty("os.name") + + " " + System.getProperty("java.vm.name") + + " " + System.getProperty("java.vm.vendor") + + " " + System.getProperty("java.vm.version") + + " Elasticsearch " + Version.CURRENT.toString(); + Settings effectiveSettings = Settings.builder() + // for thread pool size + .put("processors", + settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) + .put("client.transport.sniff", false) // do not sniff + .put("client.transport.nodes_sampler_interval", "1m") // do not ping + .put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect + .put("client.transport.ignore_cluster_name", true) // connect to any cluster + // custom settings may override defaults + .put(settings) + .build(); + logger.info("creating transport client on {} with custom settings {} and effective settings {}", + systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap()); + + // we need to disable dead lock check because we may have mixed node/transport clients + DefaultChannelFuture.setUseDeadLockChecker(false); + client = TransportClient.builder().settings(effectiveSettings).build(); + } + } + return client; + } + + public void closeClient() { + synchronized (lock) { + if (client != null) { + if (client instanceof Client) { + ((Client) client).close(); + } + if (client != null) { + client.threadPool().shutdownNow(); + } + client = null; + } } - return null; } public void init(TransportClient transportClient, Settings settings) throws IOException { diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index 750914c..dbd60e0 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -18,17 +18,8 @@ public class TransportSearchClient extends AbstractSearchClient { } @Override - protected ElasticsearchClient createClient(Settings settings) throws IOException { - return helper.createClient(settings); - } - - @Override - protected void closeClient() { - if (getClient() != null) { - TransportClient client = (TransportClient) getClient(); - client.close(); - client.threadPool().shutdown(); - } + public ElasticsearchClient createClient(Settings settings) throws IOException { + return helper.createClient(settings, null); } @Override @@ -36,4 +27,9 @@ public class TransportSearchClient extends AbstractSearchClient { super.init(settings); helper.init((TransportClient) getClient(), settings); } + + @Override + public void closeClient() { + helper.closeClient(); + } } diff --git a/gradle.properties b/gradle.properties index 04cf885..ee6d2fb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.17 +version = 2.2.1.18 gradle.wrapper.version = 6.4.1 xbib-metrics.version = 2.1.0