fix locking in open/close logic, allow more than one clients if connected to different clusters

Jörg Prante 4 years ago
parent 66df115579
commit c4562bb681

@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
@ -53,8 +52,6 @@ import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.xbib.elx.api.AdminClient;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.IndexAliasAdder;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult;
@ -70,11 +67,20 @@ import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Matcher;

@ -28,6 +28,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class AbstractBulkClient extends AbstractNativeClient implements BulkClient {
@ -37,17 +38,19 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
private BulkController bulkController;
private final AtomicBoolean closed = new AtomicBoolean(true);
@Override
public void init(Settings settings) throws IOException {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
super.init(settings);
if (bulkMetric == null) {
if (closed.compareAndSet(true, false)) {
super.init(settings);
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
bulkMetric = new DefaultBulkMetric();
bulkMetric.init(settings);
}
if (bulkController == null) {
bulkController = new DefaultBulkController(this, bulkMetric);
bulkController.init(settings);
} else {
logger.log(Level.WARN, "not initializing");
}
}
@ -70,8 +73,8 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
@Override
public void close() throws IOException {
ensureClientIsPresent();
if (closed.compareAndSet(false, true)) {
ensureClientIsPresent();
if (bulkMetric != null) {
logger.info("closing bulk metric");
bulkMetric.close();
@ -80,7 +83,7 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
logger.info("closing bulk controller");
bulkController.close();
}
closeClient();
closeClient(settings);
}
}

@ -40,14 +40,17 @@ public abstract class AbstractNativeClient implements NativeClient {
protected ElasticsearchClient client;
protected final AtomicBoolean closed;
protected Settings settings;
protected AbstractNativeClient() {
private final AtomicBoolean closed;
public AbstractNativeClient() {
closed = new AtomicBoolean(false);
}
@Override
public void setClient(ElasticsearchClient client) {
logger.log(Level.INFO, "setting client = " + client);
this.client = client;
}
@ -56,16 +59,14 @@ public abstract class AbstractNativeClient implements NativeClient {
return client;
}
protected abstract ElasticsearchClient createClient(Settings settings) throws IOException;
protected abstract void closeClient() throws IOException;
@Override
public void init(Settings settings) throws IOException {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
if (client == null) {
client = createClient(settings);
if (closed.compareAndSet(false, true)) {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
this.settings = settings;
setClient(createClient(settings));
} else {
logger.log(Level.WARN, "not initializing");
}
}
@ -166,10 +167,14 @@ public abstract class AbstractNativeClient implements NativeClient {
public void close() throws IOException {
ensureClientIsPresent();
if (closed.compareAndSet(false, true)) {
closeClient();
closeClient(settings);
}
}
protected abstract ElasticsearchClient createClient(Settings settings) throws IOException;
protected abstract void closeClient(Settings settings) throws IOException;
protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
ensureClientIsPresent();
if (index == null) {
@ -217,5 +222,4 @@ public abstract class AbstractNativeClient implements NativeClient {
throw new IllegalArgumentException("unknown time unit: " + timeUnit);
}
}
}

@ -25,7 +25,7 @@ public class MockAdminClient extends AbstractAdminClient {
}
@Override
protected void closeClient() {
protected void closeClient(Settings settings) {
}
@Override

@ -36,7 +36,7 @@ public class MockBulkClient extends AbstractBulkClient {
}
@Override
protected void closeClient() {
protected void closeClient(Settings settings) {
}
@Override

@ -34,7 +34,7 @@ public class MockSearchClient extends AbstractSearchClient {
}
@Override
protected void closeClient() {
protected void closeClient(Settings settings) {
}
@Override

@ -4,8 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractAdminClient;
import java.io.IOException;
public class NodeAdminClient extends AbstractAdminClient {
private final NodeClientHelper helper;
@ -15,12 +13,12 @@ public class NodeAdminClient extends AbstractAdminClient {
}
@Override
public ElasticsearchClient createClient(Settings settings) throws IOException {
public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null);
}
@Override
public void closeClient() {
helper.closeClient();
public void closeClient(Settings settings) {
helper.closeClient(settings);
}
}

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractBulkClient;
import java.io.IOException;
public class NodeBulkClient extends AbstractBulkClient {
@ -14,12 +13,12 @@ public class NodeBulkClient extends AbstractBulkClient {
}
@Override
public ElasticsearchClient createClient(Settings settings) throws IOException {
public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null);
}
@Override
public void closeClient() {
helper.closeClient();
public void closeClient(Settings settings) {
helper.closeClient(settings);
}
}

@ -10,18 +10,18 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class NodeClientHelper {
private static final Logger logger = LogManager.getLogger(NodeClientHelper.class.getName());
private static Node node;
private static ElasticsearchClient client;
private static Object configurationObject;
private final Object lock = new Object();
private static Node node;
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>();
public ElasticsearchClient createClient(Settings settings, Object object) {
if (configurationObject == null) {
@ -30,40 +30,38 @@ public class NodeClientHelper {
if (configurationObject instanceof ElasticsearchClient) {
return (ElasticsearchClient) configurationObject;
}
if (client == null) {
synchronized (lock) {
String version = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.runtime.version")
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder().put(settings)
.put("node.client", true)
.put("node.master", false)
.put("node.data", false)
.build();
logger.info("creating node client on {} with effective settings {}",
version, effectiveSettings.getAsMap());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
node = new BulkNode(new Environment(effectiveSettings), plugins);
node.start();
client = node.client();
}
}
return client;
return clientMap.computeIfAbsent(settings.get("cluster.name"),
key -> innerCreateClient(settings));
}
public void closeClient() {
synchronized (lock) {
if (client != null) {
logger.debug("closing node...");
node.close();
node = null;
client = null;
}
public void closeClient(Settings settings) {
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name"));
if (client != null) {
logger.debug("closing node...");
node.close();
node = null;
}
}
private ElasticsearchClient innerCreateClient(Settings settings) {
String version = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.runtime.version")
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder().put(settings)
.put("node.client", true)
.put("node.master", false)
.put("node.data", false)
.build();
logger.info("creating node client on {} with effective settings {}",
version, effectiveSettings.getAsMap());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
node = new BulkNode(new Environment(effectiveSettings), plugins);
node.start();
return node.client();
}
private static class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractSearchClient;
import java.io.IOException;
public class NodeSearchClient extends AbstractSearchClient {
@ -14,12 +13,12 @@ public class NodeSearchClient extends AbstractSearchClient {
}
@Override
public ElasticsearchClient createClient(Settings settings) throws IOException {
public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null);
}
@Override
public void closeClient() {
helper.closeClient();
public void closeClient(Settings settings) {
helper.closeClient(settings);
}
}

@ -30,7 +30,7 @@ public class TransportAdminClient extends AbstractAdminClient {
}
@Override
public void closeClient() {
helper.closeClient();
public void closeClient(Settings settings) {
helper.closeClient(settings);
}
}

@ -18,7 +18,7 @@ public class TransportBulkClient extends AbstractBulkClient {
}
@Override
public ElasticsearchClient createClient(Settings settings) throws IOException {
public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings, null);
}
@ -29,7 +29,7 @@ public class TransportBulkClient extends AbstractBulkClient {
}
@Override
public void closeClient() {
helper.closeClient();
public void closeClient(Settings settings) {
helper.closeClient(settings);
}
}

@ -21,17 +21,17 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TransportClientHelper {
private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName());
private static ElasticsearchClient client;
private static Object configurationObject;
private final Object lock = new Object();
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>();
public ElasticsearchClient createClient(Settings settings, Object object) {
if (configurationObject == null && object != null) {
@ -40,46 +40,17 @@ public class TransportClientHelper {
if (configurationObject instanceof ElasticsearchClient) {
return (ElasticsearchClient) configurationObject;
}
if (client == null) {
synchronized (lock) {
String systemIdentifier = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.vm.version")
+ " Elasticsearch " + Version.CURRENT.toString();
Settings effectiveSettings = Settings.builder()
// for thread pool size
.put("processors",
settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
.put("client.transport.sniff", false) // do not sniff
.put("client.transport.nodes_sampler_interval", "1m") // do not ping
.put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect
.put("client.transport.ignore_cluster_name", true) // connect to any cluster
// custom settings may override defaults
.put(settings)
.build();
logger.info("creating transport client on {} with custom settings {} and effective settings {}",
systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap());
// we need to disable dead lock check because we may have mixed node/transport clients
DefaultChannelFuture.setUseDeadLockChecker(false);
client = TransportClient.builder().settings(effectiveSettings).build();
}
}
return client;
return clientMap.computeIfAbsent(settings.get("cluster.name"),
key -> innerCreateClient(settings));
}
public void closeClient() {
synchronized (lock) {
if (client != null) {
if (client instanceof Client) {
((Client) client).close();
}
if (client != null) {
client.threadPool().shutdownNow();
}
client = null;
public void closeClient(Settings settings) {
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name"));
if (client != null) {
if (client instanceof Client) {
((Client) client).close();
}
client.threadPool().shutdownNow();
}
}
@ -143,4 +114,29 @@ public class TransportClientHelper {
transportClient.addTransportAddress(discoveryNode.getAddress());
}
}
private ElasticsearchClient innerCreateClient(Settings settings) {
String systemIdentifier = System.getProperty("os.name")
+ " " + System.getProperty("java.vm.name")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.vm.version")
+ " Elasticsearch " + Version.CURRENT.toString();
Settings effectiveSettings = Settings.builder()
// for thread pool size
.put("processors",
settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
.put("client.transport.sniff", false) // do not sniff
.put("client.transport.nodes_sampler_interval", "1m") // do not ping
.put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect
.put("client.transport.ignore_cluster_name", true) // connect to any cluster
// custom settings may override defaults
.put(settings)
.build();
logger.info("creating transport client on {} with custom settings {} and effective settings {}",
systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap());
// we need to disable dead lock check because we may have mixed node/transport clients
DefaultChannelFuture.setUseDeadLockChecker(false);
return TransportClient.builder().settings(effectiveSettings).build();
}
}

@ -29,7 +29,7 @@ public class TransportSearchClient extends AbstractSearchClient {
}
@Override
public void closeClient() {
helper.closeClient();
public void closeClient(Settings settings) {
helper.closeClient(settings);
}
}

Loading…
Cancel
Save