add helper classes to share same Elasticsearch low level client

This commit is contained in:
Jörg Prante 2020-05-23 21:42:19 +02:00
parent 794446534c
commit 66df115579
9 changed files with 173 additions and 207 deletions

View file

@ -1,64 +1,26 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.xbib.elx.common.AbstractAdminClient;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
public class NodeAdminClient extends AbstractAdminClient {
private static final Logger logger = LogManager.getLogger(NodeAdminClient.class.getName());
private final NodeClientHelper helper;
private Node node;
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
if (settings != null) {
String version = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.runtime.version")
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder().put(settings)
.put("node.client", true)
.put("node.master", false)
.put("node.data", false)
.build();
logger.info("creating node client on {} with effective settings {}",
version, effectiveSettings.getAsMap());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
try {
node.start();
} catch (Exception e) {
throw new IOException(e);
}
return node.client();
}
return null;
public NodeAdminClient() {
this.helper = new NodeClientHelper();
}
@Override
protected void closeClient() {
if (node != null) {
logger.debug("closing node...");
node.close();
}
public ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings, null);
}
private static class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
super(env, Version.CURRENT, classpathPlugins);
}
@Override
public void closeClient() {
helper.closeClient();
}
}

View file

@ -1,68 +1,25 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.xbib.elx.common.AbstractBulkClient;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
public class NodeBulkClient extends AbstractBulkClient {
private static final Logger logger = LogManager.getLogger(NodeBulkClient.class.getName());
private final NodeClientHelper helper;
private Node node;
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
if (settings == null) {
return null;
}
String version = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.runtime.version")
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder().put(settings)
.put("node.client", true)
.put("node.master", false)
.put("node.data", false)
.build();
XContentBuilder builder = XContentFactory.jsonBuilder();
effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true")));
logger.info("creating node client on {} with effective settings {}",
version, builder.string());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
try {
node.start();
} catch (Exception e) {
throw new IOException(e);
}
return node.client();
public NodeBulkClient() {
this.helper = new NodeClientHelper();
}
@Override
protected void closeClient() throws IOException {
if (node != null) {
logger.debug("closing node client");
node.close();
}
public ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings, null);
}
private static class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
super(env, Version.CURRENT, classpathPlugins);
}
@Override
public void closeClient() {
helper.closeClient();
}
}

View file

@ -0,0 +1,73 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.util.Collection;
import java.util.Collections;
public class NodeClientHelper {
private static final Logger logger = LogManager.getLogger(NodeClientHelper.class.getName());
private static Node node;
private static ElasticsearchClient client;
private static Object configurationObject;
private final Object lock = new Object();
public ElasticsearchClient createClient(Settings settings, Object object) {
if (configurationObject == null) {
configurationObject = object;
}
if (configurationObject instanceof ElasticsearchClient) {
return (ElasticsearchClient) configurationObject;
}
if (client == null) {
synchronized (lock) {
String version = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.runtime.version")
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder().put(settings)
.put("node.client", true)
.put("node.master", false)
.put("node.data", false)
.build();
logger.info("creating node client on {} with effective settings {}",
version, effectiveSettings.getAsMap());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
node = new BulkNode(new Environment(effectiveSettings), plugins);
node.start();
client = node.client();
}
}
return client;
}
public void closeClient() {
synchronized (lock) {
if (client != null) {
logger.debug("closing node...");
node.close();
node = null;
client = null;
}
}
}
private static class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
super(env, Version.CURRENT, classpathPlugins);
}
}
}

View file

@ -1,68 +1,25 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.xbib.elx.common.AbstractSearchClient;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
public class NodeSearchClient extends AbstractSearchClient {
private static final Logger logger = LogManager.getLogger(NodeSearchClient.class.getName());
private final NodeClientHelper helper;
private Node node;
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
if (settings == null) {
return null;
}
String version = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.runtime.version")
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder().put(settings)
.put("node.client", true)
.put("node.master", false)
.put("node.data", false)
.build();
XContentBuilder builder = XContentFactory.jsonBuilder();
effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true")));
logger.info("creating node client on {} with effective settings {}",
version, builder.string());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
try {
node.start();
} catch (Exception e) {
throw new IOException(e);
}
return node.client();
public NodeSearchClient() {
this.helper = new NodeClientHelper();
}
@Override
protected void closeClient() throws IOException {
if (node != null) {
logger.debug("closing node client");
node.close();
}
public ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings, null);
}
private static class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
super(env, Version.CURRENT, classpathPlugins);
}
@Override
public void closeClient() {
helper.closeClient();
}
}

View file

@ -19,17 +19,8 @@ public class TransportAdminClient extends AbstractAdminClient {
}
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings);
}
@Override
protected void closeClient() {
if (getClient() != null) {
TransportClient client = (TransportClient) getClient();
client.close();
client.threadPool().shutdown();
}
public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null);
}
@Override
@ -37,4 +28,9 @@ public class TransportAdminClient extends AbstractAdminClient {
super.init(settings);
helper.init((TransportClient) getClient(), settings);
}
@Override
public void closeClient() {
helper.closeClient();
}
}

View file

@ -18,17 +18,8 @@ public class TransportBulkClient extends AbstractBulkClient {
}
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings);
}
@Override
protected void closeClient() {
if (getClient() != null) {
TransportClient client = (TransportClient) getClient();
client.close();
client.threadPool().shutdown();
}
public ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings, null);
}
@Override
@ -36,4 +27,9 @@ public class TransportBulkClient extends AbstractBulkClient {
super.init(settings);
helper.init((TransportClient) getClient(), settings);
}
@Override
public void closeClient() {
helper.closeClient();
}
}

View file

@ -6,6 +6,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
@ -24,34 +25,62 @@ import java.util.List;
public class TransportClientHelper {
private static final Logger logger = LogManager.getLogger(TransportAdminClient.class.getName());
private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName());
protected ElasticsearchClient createClient(Settings settings) {
if (settings != null) {
String systemIdentifier = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.vm.version")
+ " Elasticsearch " + Version.CURRENT.toString();
Settings effectiveSettings = Settings.builder()
// for thread pool size
.put("processors",
settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
.put("client.transport.sniff", false) // do not sniff
.put("client.transport.nodes_sampler_interval", "1m") // do not ping
.put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect
.put("client.transport.ignore_cluster_name", true) // connect to any cluster
// custom settings may override defaults
.put(settings)
.build();
logger.info("creating transport client on {} with custom settings {} and effective settings {}",
systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap());
private static ElasticsearchClient client;
// we need to disable dead lock check because we may have mixed node/transport clients
DefaultChannelFuture.setUseDeadLockChecker(false);
return TransportClient.builder().settings(effectiveSettings).build();
private static Object configurationObject;
private final Object lock = new Object();
public ElasticsearchClient createClient(Settings settings, Object object) {
if (configurationObject == null && object != null) {
configurationObject = object;
}
if (configurationObject instanceof ElasticsearchClient) {
return (ElasticsearchClient) configurationObject;
}
if (client == null) {
synchronized (lock) {
String systemIdentifier = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.vm.version")
+ " Elasticsearch " + Version.CURRENT.toString();
Settings effectiveSettings = Settings.builder()
// for thread pool size
.put("processors",
settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
.put("client.transport.sniff", false) // do not sniff
.put("client.transport.nodes_sampler_interval", "1m") // do not ping
.put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect
.put("client.transport.ignore_cluster_name", true) // connect to any cluster
// custom settings may override defaults
.put(settings)
.build();
logger.info("creating transport client on {} with custom settings {} and effective settings {}",
systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap());
// we need to disable dead lock check because we may have mixed node/transport clients
DefaultChannelFuture.setUseDeadLockChecker(false);
client = TransportClient.builder().settings(effectiveSettings).build();
}
}
return client;
}
public void closeClient() {
synchronized (lock) {
if (client != null) {
if (client instanceof Client) {
((Client) client).close();
}
if (client != null) {
client.threadPool().shutdownNow();
}
client = null;
}
}
return null;
}
public void init(TransportClient transportClient, Settings settings) throws IOException {

View file

@ -18,17 +18,8 @@ public class TransportSearchClient extends AbstractSearchClient {
}
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings);
}
@Override
protected void closeClient() {
if (getClient() != null) {
TransportClient client = (TransportClient) getClient();
client.close();
client.threadPool().shutdown();
}
public ElasticsearchClient createClient(Settings settings) throws IOException {
return helper.createClient(settings, null);
}
@Override
@ -36,4 +27,9 @@ public class TransportSearchClient extends AbstractSearchClient {
super.init(settings);
helper.init((TransportClient) getClient(), settings);
}
@Override
public void closeClient() {
helper.closeClient();
}
}

View file

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 2.2.1.17
version = 2.2.1.18
gradle.wrapper.version = 6.4.1
xbib-metrics.version = 2.1.0