diff --git a/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java b/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java index 545e9e8..970268e 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java @@ -1,5 +1,8 @@ package org.xbib.elasticsearch; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; @@ -21,9 +24,6 @@ import java.util.TreeSet; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - /** * */ diff --git a/src/integration-test/java/org/xbib/elasticsearch/NodeTestUtils.java b/src/integration-test/java/org/xbib/elasticsearch/NodeTestUtils.java index 1815326..d098332 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/NodeTestUtils.java +++ b/src/integration-test/java/org/xbib/elasticsearch/NodeTestUtils.java @@ -1,5 +1,7 @@ package org.xbib.elasticsearch; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; + import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; @@ -27,21 +29,27 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; - /** * */ public class NodeTestUtils { - protected static final ESLogger logger = ESLoggerFactory.getLogger("test"); + private static final ESLogger logger = ESLoggerFactory.getLogger("test"); + private static Random random = new Random(); + private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); + private Map nodes = new HashMap<>(); + private Map clients = new HashMap<>(); + private AtomicInteger counter = new AtomicInteger(); + private String cluster; + private String host; + private int port; private static void deleteFiles() throws IOException { @@ -72,13 +80,14 @@ public class NodeTestUtils { findNodeAddress(); try { ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN).timeout(TimeValue.timeValueSeconds(30))).actionGet(); + new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) + .timeout(TimeValue.timeValueSeconds(30))).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { throw new IOException("cluster state is " + healthResponse.getStatus().name() + ", from here on, everything will fail!"); } } catch (ElasticsearchTimeoutException e) { - throw new IOException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations"); + throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); } } catch (Throwable t) { logger.error("startNodes failed", t); @@ -95,7 +104,7 @@ public class NodeTestUtils { try { deleteFiles(); logger.info("data files wiped"); - Thread.sleep(2000L); + Thread.sleep(2000L); // let OS commit changes } catch (IOException e) { logger.error(e.getMessage(), e); } catch (InterruptedException e) { diff --git a/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java b/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java index 8146b19..8d1276a 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java @@ -1,24 +1,28 @@ package org.xbib.elasticsearch; +import static org.elasticsearch.client.Requests.indexRequest; +import static org.elasticsearch.client.Requests.refreshRequest; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.sort.SortOrder; import org.junit.Test; -import static org.elasticsearch.client.Requests.indexRequest; -import static org.elasticsearch.client.Requests.refreshRequest; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - /** * */ public class SearchTest extends NodeTestUtils { + private static final ESLogger logger = ESLoggerFactory.getLogger("test"); + @Test public void testSearch() throws Exception { Client client = client("1"); diff --git a/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java b/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java index 7a25dce..0af13df 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java @@ -1,5 +1,10 @@ package org.xbib.elasticsearch; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.junit.Assert.assertEquals; + import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.index.IndexAction; @@ -7,11 +12,6 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.settings.Settings; import org.junit.Test; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.query.QueryBuilders.matchQuery; -import static org.junit.Assert.assertEquals; - /** * */ @@ -56,4 +56,4 @@ public class SimpleTest extends NodeTestUtils { assertEquals(doc, "{\"field\":\"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8\"}"); } -} \ No newline at end of file +} diff --git a/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java b/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java index fd9ce16..6e252d1 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java @@ -1,5 +1,10 @@ package org.xbib.elasticsearch; +import static org.elasticsearch.client.Requests.indexRequest; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery; + import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; @@ -7,11 +12,6 @@ import org.junit.Test; import java.io.IOException; -import static org.elasticsearch.client.Requests.indexRequest; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery; - /** * */ @@ -66,5 +66,4 @@ public class WildcardTest extends NodeTestUtils { throw new RuntimeException("actualHits=" + actualHits + ", expectedHits=" + expectedHits); } } - -} \ No newline at end of file +} diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java index c7f7421..e01ca67 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java @@ -1,5 +1,9 @@ package org.xbib.elasticsearch.extras.client.node; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; @@ -24,10 +28,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - /** * */ diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java index 58e2b8e..09c628d 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java @@ -3,6 +3,8 @@ package org.xbib.elasticsearch.extras.client.node; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.junit.Before; @@ -16,6 +18,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; */ public class BulkNodeClusterBlockTest extends NodeTestUtils { + private static final ESLogger logger = ESLoggerFactory.getLogger("test"); + @Before public void startNodes() { try { diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java index 98c6a70..7d8ba1f 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java @@ -15,13 +15,16 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.junit.Assert.*; +/** + * + */ public class BulkNodeDuplicateIDTest extends NodeTestUtils { - private final static ESLogger logger = ESLoggerFactory.getLogger(BulkNodeDuplicateIDTest.class.getSimpleName()); + private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeDuplicateIDTest.class.getSimpleName()); - private final static Long MAX_ACTIONS = 1000L; + private static final Long MAX_ACTIONS = 1000L; - private final static Long NUM_ACTIONS = 12345L; + private static final Long NUM_ACTIONS = 12345L; @Test public void testDuplicateDocIDs() throws Exception { diff --git a/src/integration-test/resources/log4j2.xml b/src/integration-test/resources/log4j2.xml index f71aced..b175dfc 100644 --- a/src/integration-test/resources/log4j2.xml +++ b/src/integration-test/resources/log4j2.xml @@ -6,7 +6,7 @@ - + diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java index bac6522..877067b 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java @@ -87,7 +87,7 @@ public abstract class AbstractClient { } public void resetSettings() { - settingsBuilder = Settings.settingsBuilder(); + this.settingsBuilder = Settings.settingsBuilder(); settings = null; mappings = new HashMap<>(); } @@ -166,10 +166,10 @@ public abstract class AbstractClient { if (value == null) { throw new IOException("no value given"); } - Settings.Builder settingsBuilder = Settings.settingsBuilder(); - settingsBuilder.put(key, value.toString()); + Settings.Builder updateSettingsBuilder = Settings.settingsBuilder(); + updateSettingsBuilder.put(key, value.toString()); UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) - .settings(settingsBuilder); + .settings(updateSettingsBuilder); client().execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); } @@ -194,8 +194,7 @@ public abstract class AbstractClient { return shards; } - public void waitForCluster(String statusString, TimeValue timeout) - throws IOException, ElasticsearchTimeoutException { + public void waitForCluster(String statusString, TimeValue timeout) throws IOException { if (client() == null) { return; } @@ -222,11 +221,14 @@ public abstract class AbstractClient { int nodeCount = clusterStateResponse.getState().getNodes().size(); return name + " (" + nodeCount + " nodes connected)"; } catch (ElasticsearchTimeoutException e) { + logger.warn(e.getMessage(), e); return "TIMEOUT"; } catch (NoNodeAvailableException e) { + logger.warn(e.getMessage(), e); return "DISCONNECTED"; - } catch (Throwable t) { - return "[" + t.getMessage() + "]"; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + return "[" + e.getMessage() + "]"; } } @@ -241,11 +243,14 @@ public abstract class AbstractClient { ClusterHealthStatus status = healthResponse.getStatus(); return status.name(); } catch (ElasticsearchTimeoutException e) { + logger.warn(e.getMessage(), e); return "TIMEOUT"; } catch (NoNodeAvailableException e) { + logger.warn(e.getMessage(), e); return "DISCONNECTED"; - } catch (Throwable t) { - return "[" + t.getMessage() + "]"; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + return "[" + e.getMessage() + "]"; } } @@ -310,10 +315,8 @@ public abstract class AbstractClient { Set indices = new TreeSet<>(Collections.reverseOrder()); for (ObjectCursor indexName : getAliasesResponse.getAliases().keys()) { Matcher m = pattern.matcher(indexName.value); - if (m.matches()) { - if (alias.equals(m.group(1))) { - indices.add(indexName.value); - } + if (m.matches() && alias.equals(m.group(1))) { + indices.add(indexName.value); } } return indices.isEmpty() ? alias : indices.iterator().next(); @@ -424,10 +427,8 @@ public abstract class AbstractClient { logger.info("{} indices", getIndexResponse.getIndices().length); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); - if (m.matches()) { - if (index.equals(m.group(1)) && !s.equals(concreteIndex)) { - indices.add(s); - } + if (m.matches() && index.equals(m.group(1)) && !s.equals(concreteIndex)) { + indices.add(s); } } if (indices.isEmpty()) { @@ -470,21 +471,21 @@ public abstract class AbstractClient { } } - public Long mostRecentDocument(String index) { + public Long mostRecentDocument(String index, String timestampfieldname) { if (client() == null) { return null; } SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client(), SearchAction.INSTANCE); - SortBuilder sort = SortBuilders.fieldSort("_timestamp").order(SortOrder.DESC); + SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); SearchResponse searchResponse = searchRequestBuilder.setIndices(index) - .addField("_timestamp") + .addField(timestampfieldname) .setSize(1) .addSort(sort) .execute().actionGet(); if (searchResponse.getHits().getHits().length == 1) { SearchHit hit = searchResponse.getHits().getHits()[0]; - if (hit.getFields().get("_timestamp") != null) { - return hit.getFields().get("_timestamp").getValue(); + if (hit.getFields().get(timestampfieldname) != null) { + return hit.getFields().get(timestampfieldname).getValue(); } else { return 0L; } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java index 223c27e..814a7c7 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java @@ -197,11 +197,10 @@ public class BulkProcessor implements Closeable { } private void execute() { - final BulkRequest bulkRequest = this.bulkRequest; + final BulkRequest myBulkRequest = this.bulkRequest; final long executionId = executionIdGen.incrementAndGet(); - this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler.execute(bulkRequest, executionId); + this.bulkRequestHandler.execute(myBulkRequest, executionId); } private boolean isOverTheLimit() { @@ -372,15 +371,15 @@ public class BulkProcessor implements Closeable { /** * Abstracts the low-level details of bulk request handling. */ - abstract class BulkRequestHandler { + interface BulkRequestHandler { - public abstract void execute(BulkRequest bulkRequest, long executionId); + void execute(BulkRequest bulkRequest, long executionId); - public abstract boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; + boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; } - private class SyncBulkRequestHandler extends BulkRequestHandler { + private class SyncBulkRequestHandler implements BulkRequestHandler { private final Client client; private final BulkProcessor.Listener listener; @@ -389,6 +388,7 @@ public class BulkProcessor implements Closeable { this.listener = listener; } + @Override public void execute(BulkRequest bulkRequest, long executionId) { boolean afterCalled = false; try { @@ -396,19 +396,20 @@ public class BulkProcessor implements Closeable { BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet(); afterCalled = true; listener.afterBulk(executionId, bulkRequest, bulkResponse); - } catch (Throwable t) { + } catch (Exception e) { if (!afterCalled) { - listener.afterBulk(executionId, bulkRequest, t); + listener.afterBulk(executionId, bulkRequest, e); } } } + @Override public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { return true; } } - private class AsyncBulkRequestHandler extends BulkRequestHandler { + private class AsyncBulkRequestHandler implements BulkRequestHandler { private final Client client; private final BulkProcessor.Listener listener; private final Semaphore semaphore; @@ -452,8 +453,8 @@ public class BulkProcessor implements Closeable { } catch (InterruptedException e) { Thread.currentThread().interrupt(); listener.afterBulk(executionId, bulkRequest, e); - } catch (Throwable t) { - listener.afterBulk(executionId, bulkRequest, t); + } catch (Exception e) { + listener.afterBulk(executionId, bulkRequest, e); } finally { if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore semaphore.release(); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java b/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java index c643924..74de495 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java @@ -355,13 +355,14 @@ public interface ClientMethods extends Parameters { void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep); /** - * Log the timestamp of the most recently indexed document in the index. + * Find the timestamp of the most recently indexed document in the index. * * @param index the index name + * @param timestampfieldname the timestamp field name * @return millis UTC millis of the most recent document * @throws IOException if most rcent document can not be found */ - Long mostRecentDocument(String index) throws IOException; + Long mostRecentDocument(String index, String timestampfieldname) throws IOException; /** * Get metric. diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/IndexAliasAdder.java b/src/main/java/org/xbib/elasticsearch/extras/client/IndexAliasAdder.java index 8ce2df5..a659ab4 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/IndexAliasAdder.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/IndexAliasAdder.java @@ -5,6 +5,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder /** * */ +@FunctionalInterface public interface IndexAliasAdder { void addIndexAlias(IndicesAliasesRequestBuilder builder, String index, String alias); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java b/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java index 4dd69da..9c5ffc2 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java @@ -1,5 +1,8 @@ package org.xbib.elasticsearch.extras.client; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; + import java.io.IOException; import java.net.Inet4Address; import java.net.Inet6Address; @@ -8,7 +11,6 @@ import java.net.NetworkInterface; import java.net.SocketException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.Enumeration; import java.util.List; import java.util.Locale; @@ -18,27 +20,30 @@ import java.util.Locale; */ public class NetworkUtils { - private static final String IPv4_SETTING = "java.net.preferIPv4Stack"; + private static final ESLogger logger = ESLoggerFactory.getLogger(NetworkUtils.class.getName()); - private static final String IPv6_SETTING = "java.net.preferIPv6Addresses"; + private static final String IPV4_SETTING = "java.net.preferIPv4Stack"; - private static final InetAddress localAddress; + private static final String IPV6_SETTING = "java.net.preferIPv6Addresses"; + + private static final InetAddress LOCAL_ADDRESS; static { InetAddress address; try { address = InetAddress.getLocalHost(); - } catch (Throwable e) { + } catch (Exception e) { + logger.warn(e.getMessage(), e); address = InetAddress.getLoopbackAddress(); } - localAddress = address; + LOCAL_ADDRESS = address; } private NetworkUtils() { } public static InetAddress getLocalAddress() { - return localAddress; + return LOCAL_ADDRESS; } public static InetAddress getFirstNonLoopbackAddress(ProtocolVersion ipversion) throws SocketException { @@ -49,6 +54,7 @@ public class NetworkUtils { continue; } } catch (Exception e) { + logger.warn(e.getMessage(), e); continue; } address = getFirstNonLoopbackAddress(networkInterface, ipversion); @@ -66,11 +72,9 @@ public class NetworkUtils { } for (Enumeration addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) { InetAddress address = addresses.nextElement(); - if (!address.isLoopbackAddress()) { - if ((address instanceof Inet4Address && ipVersion == ProtocolVersion.IPv4) || - (address instanceof Inet6Address && ipVersion == ProtocolVersion.IPv6)) { - return address; - } + if (!address.isLoopbackAddress() && (address instanceof Inet4Address && ipVersion == ProtocolVersion.IPV4) || + (address instanceof Inet6Address && ipVersion == ProtocolVersion.IPV6)) { + return address; } } return null; @@ -83,8 +87,8 @@ public class NetworkUtils { } for (Enumeration addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) { InetAddress address = addresses.nextElement(); - if ((address instanceof Inet4Address && ipVersion == ProtocolVersion.IPv4) || - (address instanceof Inet6Address && ipVersion == ProtocolVersion.IPv6)) { + if ((address instanceof Inet4Address && ipVersion == ProtocolVersion.IPV4) || + (address instanceof Inet6Address && ipVersion == ProtocolVersion.IPV6)) { return address; } } @@ -122,18 +126,20 @@ public class NetworkUtils { public static ProtocolVersion getProtocolVersion() throws SocketException { switch (findAvailableProtocols()) { - case IPv4: - return ProtocolVersion.IPv4; - case IPv6: - return ProtocolVersion.IPv6; - case IPv46: - if (Boolean.getBoolean(System.getProperty(IPv4_SETTING))) { - return ProtocolVersion.IPv4; + case IPV4: + return ProtocolVersion.IPV4; + case IPV6: + return ProtocolVersion.IPV6; + case IPV46: + if (Boolean.getBoolean(System.getProperty(IPV4_SETTING))) { + return ProtocolVersion.IPV4; } - if (Boolean.getBoolean(System.getProperty(IPv6_SETTING))) { - return ProtocolVersion.IPv6; + if (Boolean.getBoolean(System.getProperty(IPV6_SETTING))) { + return ProtocolVersion.IPV6; } - return ProtocolVersion.IPv6; + return ProtocolVersion.IPV6; + default: + break; } return ProtocolVersion.NONE; } @@ -150,18 +156,19 @@ public class NetworkUtils { } } if (hasIPv4 && hasIPv6) { - return ProtocolVersion.IPv46; + return ProtocolVersion.IPV46; } if (hasIPv4) { - return ProtocolVersion.IPv4; + return ProtocolVersion.IPV4; } if (hasIPv6) { - return ProtocolVersion.IPv6; + return ProtocolVersion.IPV6; } return ProtocolVersion.NONE; } - public static InetAddress resolveInetAddress(String host, String defaultValue) throws IOException { + public static InetAddress resolveInetAddress(String hostname, String defaultValue) throws IOException { + String host = hostname; if (host == null) { host = defaultValue; } @@ -172,23 +179,23 @@ public class NetworkUtils { } if ((host.startsWith("#") && host.endsWith("#")) || (host.startsWith("_") && host.endsWith("_"))) { host = host.substring(1, host.length() - 1); - if (host.equals("local")) { + if ("local".equals(host)) { return getLocalAddress(); } else if (host.startsWith("non_loopback")) { if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) { - return getFirstNonLoopbackAddress(ProtocolVersion.IPv4); + return getFirstNonLoopbackAddress(ProtocolVersion.IPV4); } else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) { - return getFirstNonLoopbackAddress(ProtocolVersion.IPv6); + return getFirstNonLoopbackAddress(ProtocolVersion.IPV6); } else { return getFirstNonLoopbackAddress(getProtocolVersion()); } } else { ProtocolVersion protocolVersion = getProtocolVersion(); if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) { - protocolVersion = ProtocolVersion.IPv4; + protocolVersion = ProtocolVersion.IPV4; host = host.substring(0, host.length() - 5); } else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) { - protocolVersion = ProtocolVersion.IPv6; + protocolVersion = ProtocolVersion.IPV6; host = host.substring(0, host.length() - 5); } for (NetworkInterface ni : getAllAvailableInterfaces()) { @@ -227,27 +234,17 @@ public class NetworkUtils { } private static void sortInterfaces(List interfaces) { - Collections.sort(interfaces, new Comparator() { - @Override - public int compare(NetworkInterface o1, NetworkInterface o2) { - return Integer.compare(o1.getIndex(), o2.getIndex()); - } - }); + Collections.sort(interfaces, (o1, o2) -> Integer.compare(o1.getIndex(), o2.getIndex())); } private static void sortAddresses(List addressList) { - Collections.sort(addressList, new Comparator() { - @Override - public int compare(InetAddress o1, InetAddress o2) { - return compareBytes(o1.getAddress(), o2.getAddress()); - } - }); + Collections.sort(addressList, (o1, o2) -> compareBytes(o1.getAddress(), o2.getAddress())); } private static int compareBytes(byte[] left, byte[] right) { for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { - int a = (left[i] & 0xff); - int b = (right[j] & 0xff); + int a = left[i] & 0xff; + int b = right[j] & 0xff; if (a != b) { return a - b; } @@ -259,6 +256,6 @@ public class NetworkUtils { * */ public enum ProtocolVersion { - IPv4, IPv6, IPv46, NONE + IPV4, IPV6, IPV46, NONE } } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/SimpleBulkMetric.java b/src/main/java/org/xbib/elasticsearch/extras/client/SimpleBulkMetric.java index bfbde5a..e836816 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/SimpleBulkMetric.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/SimpleBulkMetric.java @@ -65,7 +65,7 @@ public class SimpleBulkMetric implements BulkMetric { @Override public void start() { this.started = System.nanoTime(); - this.totalIngest.spawn(5L); + totalIngest.spawn(5L); } @Override diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java index 5a0df14..74a8dc4 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java @@ -54,6 +54,8 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL; + private Node node; + private ElasticsearchClient client; private BulkProcessor bulkProcessor; @@ -66,9 +68,6 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { private boolean closed; - public BulkNodeClient() { - } - @Override public BulkNodeClient maxActionsPerRequest(int maxActionsPerRequest) { this.maxActionsPerRequest = maxActionsPerRequest; @@ -196,12 +195,12 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { } @Override - protected void createClient(Settings settings) throws IOException { + protected synchronized void createClient(Settings settings) throws IOException { if (client != null) { logger.warn("client is open, closing..."); client.threadPool().shutdown(); - logger.warn("client is closed"); client = null; + node.close(); } if (settings != null) { String version = System.getProperty("os.name") @@ -216,7 +215,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { logger.info("creating node client on {} with effective settings {}", version, effectiveSettings.getAsMap()); Collection> plugins = Collections.emptyList(); - Node node = new BulkNode(new Environment(effectiveSettings), plugins); + this.node = new BulkNode(new Environment(effectiveSettings), plugins); node.start(); this.client = node.client(); } @@ -230,7 +229,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient index(String index, String type, String id, String source) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { if (metric != null) { @@ -248,7 +247,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient bulkIndex(IndexRequest indexRequest) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { if (metric != null) { @@ -266,7 +265,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient delete(String index, String type, String id) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { if (metric != null) { @@ -284,7 +283,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient bulkDelete(DeleteRequest deleteRequest) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { if (metric != null) { @@ -302,7 +301,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient update(String index, String type, String id, String source) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { if (metric != null) { @@ -320,7 +319,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient bulkUpdate(UpdateRequest updateRequest) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { if (metric != null) { @@ -338,7 +337,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient flushIngest() { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } logger.debug("flushing bulk processor"); bulkProcessor.flush(); @@ -348,7 +347,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } while (!bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS)) { logger.warn("still waiting for responses"); @@ -395,6 +394,10 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { } metric.stop(); } + if (node != null) { + logger.debug("closing node..."); + node.close(); + } } catch (Exception e) { logger.error(e.getMessage(), e); } @@ -416,7 +419,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient newIndex(String index, Settings settings, Map mappings) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } if (client == null) { logger.warn("no client for create index"); @@ -433,9 +436,11 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { createIndexRequestBuilder.setSettings(settings); } if (mappings != null) { - for (String type : mappings.keySet()) { + for (Map.Entry entry : mappings.entrySet()) { + String type = entry.getKey(); + String mapping = entry.getValue(); logger.info("found mapping for {}", type); - createIndexRequestBuilder.addMapping(type, mappings.get(type)); + createIndexRequestBuilder.addMapping(type, mapping); } } createIndexRequestBuilder.execute().actionGet(); @@ -458,7 +463,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { @Override public BulkNodeClient deleteIndex(String index) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } if (client == null) { logger.warn("no client"); @@ -488,10 +493,15 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { return settings(); } + @Override public Settings.Builder getSettingsBuilder() { return settingsBuilder(); } + private static void throwClose() { + throw new ElasticsearchException("client is closed"); + } + private class BulkNode extends Node { BulkNode(Environment env, Collection> classpathPlugins) { diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java index ac2a00e..b03aeef 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java @@ -28,9 +28,9 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.xbib.elasticsearch.extras.client.AbstractClient; -import org.xbib.elasticsearch.extras.client.BulkProcessor; -import org.xbib.elasticsearch.extras.client.BulkMetric; import org.xbib.elasticsearch.extras.client.BulkControl; +import org.xbib.elasticsearch.extras.client.BulkMetric; +import org.xbib.elasticsearch.extras.client.BulkProcessor; import org.xbib.elasticsearch.extras.client.ClientMethods; import org.xbib.elasticsearch.extras.client.NetworkUtils; @@ -75,9 +75,6 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods private boolean isShutdown; - public BulkTransportClient() { - } - @Override public BulkTransportClient init(ElasticsearchClient client, BulkMetric metric, BulkControl control) throws IOException { return init(findSettings(), metric, control); @@ -198,7 +195,6 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods logger.warn("client is open, closing..."); client.close(); client.threadPool().shutdown(); - logger.warn("client is closed"); client = null; } if (settings != null) { @@ -257,7 +253,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public ClientMethods newIndex(String index) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } return newIndex(index, null, null); } @@ -273,11 +269,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public ClientMethods newIndex(String index, Settings settings, Map mappings) { if (closed) { - throw new ElasticsearchException("client is closed"); - } - if (client == null) { - logger.warn("no client for create index"); - return this; + throwClose(); } if (index == null) { logger.warn("no index name given to create index"); @@ -290,9 +282,11 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods createIndexRequestBuilder.setSettings(settings); } if (mappings != null) { - for (String type : mappings.keySet()) { + for (Map.Entry entry : mappings.entrySet()) { + String type = entry.getKey(); + String mapping = entry.getValue(); logger.info("found mapping for {}", type); - createIndexRequestBuilder.addMapping(type, mappings.get(type)); + createIndexRequestBuilder.addMapping(type, mapping); } } createIndexRequestBuilder.execute().actionGet(); @@ -303,11 +297,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public ClientMethods deleteIndex(String index) { if (closed) { - throw new ElasticsearchException("client is closed"); - } - if (client == null) { - logger.warn("no client for delete index"); - return this; + throwClose(); } if (index == null) { logger.warn("no index name given to delete index"); @@ -345,7 +335,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public BulkTransportClient index(String index, String type, String id, String source) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { metric.getCurrentIngest().inc(index, type, id); @@ -361,7 +351,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public BulkTransportClient bulkIndex(IndexRequest indexRequest) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); @@ -377,7 +367,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public BulkTransportClient delete(String index, String type, String id) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { metric.getCurrentIngest().inc(index, type, id); @@ -393,7 +383,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public BulkTransportClient bulkDelete(DeleteRequest deleteRequest) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); @@ -409,7 +399,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public BulkTransportClient update(String index, String type, String id, String source) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { metric.getCurrentIngest().inc(index, type, id); @@ -425,7 +415,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public BulkTransportClient bulkUpdate(UpdateRequest updateRequest) { if (closed) { - throw new ElasticsearchException("client is closed"); + throwClose(); } try { metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); @@ -441,11 +431,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods @Override public synchronized BulkTransportClient flushIngest() { if (closed) { - throw new ElasticsearchException("client is closed"); - } - if (client == null) { - logger.warn("no client"); - return this; + throwClose(); } logger.debug("flushing bulk processor"); bulkProcessor.flush(); @@ -456,11 +442,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException { if (closed) { - throw new ElasticsearchException("client is closed"); - } - if (client == null) { - logger.warn("no client"); - return this; + throwClose(); } bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); return this; @@ -470,11 +452,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods public synchronized void shutdown() { if (closed) { shutdownClient(); - throw new ElasticsearchException("client is closed"); - } - if (client == null) { - logger.warn("no client"); - return; + throwClose(); } try { if (bulkProcessor != null) { @@ -532,7 +510,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods try { port = Integer.parseInt(splitHost[1]); } catch (Exception e) { - // ignore + logger.warn(e.getMessage(), e); } addresses.add(new InetSocketTransportAddress(inetAddress, port)); } @@ -545,6 +523,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods return addresses; } + private static void throwClose() { + throw new ElasticsearchException("client is closed"); + } + private void shutdownClient() { if (client != null) { logger.debug("shutdown started"); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java index 76bf69e..86199d2 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java @@ -19,9 +19,6 @@ import java.util.Map; */ public class MockTransportClient extends BulkTransportClient { - public MockTransportClient() { - } - @Override public ElasticsearchClient client() { return null; @@ -124,18 +121,22 @@ public class MockTransportClient extends BulkTransportClient { @Override public void putMapping(String index) { + // mockup method } @Override public void refreshIndex(String index) { + // mockup method } @Override public void flushIndex(String index) { + // mockup method } @Override public void waitForCluster(String healthColor, TimeValue timeValue) throws IOException { + // mockup method } @Override @@ -150,7 +151,7 @@ public class MockTransportClient extends BulkTransportClient { @Override public void shutdown() { - // do nothing + // mockup method } } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java index 423503e..18b6f7c 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java @@ -4,7 +4,6 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -32,9 +31,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.ModulesBuilder; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; @@ -49,7 +45,6 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.SearchModule; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; -import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.FutureTransportResponseHandler; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportRequestOptions; @@ -67,9 +62,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** - * Stripped-down transport client without node sampling. + * Stripped-down transport client without node sampling and without retrying. + * * Merged together: original TransportClient, TransportClientNodesServce, TransportClientProxy - * Configurable ping interval setting added + + * Configurable connect ping interval setting added. */ public class TransportClient extends AbstractClient { @@ -260,7 +257,7 @@ public class TransportClient extends AbstractClient { try { injector.getInstance(MonitorService.class).close(); } catch (Exception e) { - // ignore, might not be bounded + logger.debug(e.getMessage(), e); } for (Class plugin : injector.getInstance(PluginsService.class).nodeServices()) { injector.getInstance(plugin).close(); @@ -268,7 +265,7 @@ public class TransportClient extends AbstractClient { try { ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS); } catch (Exception e) { - // ignore + logger.debug(e.getMessage(), e); } injector.getInstance(PageCacheRecycler.class).close(); } @@ -281,7 +278,7 @@ public class TransportClient extends AbstractClient { try { logger.trace("connecting to listed node (light) [{}]", listedNode); transportService.connectToNodeLight(listedNode); - } catch (Throwable e) { + } catch (Exception e) { logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode); continue; } @@ -310,7 +307,7 @@ public class TransportClient extends AbstractClient { listedNode); newNodes.add(listedNode); } - } catch (Throwable e) { + } catch (Exception e) { logger.info("failed to get node info for {}, disconnecting...", e, listedNode); transportService.disconnectFromNode(listedNode); } @@ -321,7 +318,7 @@ public class TransportClient extends AbstractClient { try { logger.trace("connecting to node [{}]", node); transportService.connectToNode(node); - } catch (Throwable e) { + } catch (Exception e) { it.remove(); logger.debug("failed to connect to discovered node [" + node + "]", e); } @@ -333,22 +330,14 @@ public class TransportClient extends AbstractClient { @Override @SuppressWarnings({"unchecked", "rawtypes"}) - protected > - void doExecute(Action action, final Request request, - ActionListener listener) { - final TransportActionNodeProxy proxyAction = proxyActionMap.getProxies().get(action); + protected > + void doExecute(Action action, final R request, final ActionListener listener) { + final TransportActionNodeProxy proxyAction = proxyActionMap.getProxies().get(action); if (proxyAction == null) { throw new IllegalStateException("undefined action " + action); } - NodeListenerCallback callback = new NodeListenerCallback() { - @Override - public void doWithNode(DiscoveryNode node, ActionListener listener) { - proxyAction.execute(node, request, listener); - } - }; - List nodes = this.nodes; - if (nodes.isEmpty()) { + List nodeList = this.nodes; + if (nodeList.isEmpty()) { throw new NoNodeAvailableException("none of the configured nodes are available: " + this.listedNodes); } int index = nodeCounter.incrementAndGet(); @@ -356,24 +345,14 @@ public class TransportClient extends AbstractClient { index = 0; nodeCounter.set(0); } - RetryListener retryListener = new RetryListener<>(callback, listener, nodes, index); - DiscoveryNode node = nodes.get((index) % nodes.size()); + // try once and never more try { - callback.doWithNode(node, retryListener); - } catch (Throwable t) { - listener.onFailure(t); + proxyAction.execute(nodeList.get(index % nodeList.size()), request, listener); + } catch (Exception e) { + listener.onFailure(e); } } - /** - * - * @param - */ - interface NodeListenerCallback { - - void doWithNode(DiscoveryNode node, ActionListener listener); - } - /** * */ @@ -397,19 +376,17 @@ public class TransportClient extends AbstractClient { } public TransportClient build() { - Settings settings = InternalSettingsPreparer.prepareSettings(this.settings); - settings = settingsBuilder() + Settings transportClientSettings = settingsBuilder() .put("transport.ping.schedule", this.settings.get("ping.interval", "30s")) - .put(settings) + .put(InternalSettingsPreparer.prepareSettings(this.settings)) .put("network.server", false) .put("node.client", true) .put(CLIENT_TYPE_SETTING, CLIENT_TYPE) .build(); - PluginsService pluginsService = new PluginsService(settings, null, null, pluginClasses); + PluginsService pluginsService = new PluginsService(transportClientSettings, null, null, pluginClasses); this.settings = pluginsService.updatedSettings(); Version version = Version.CURRENT; - final ThreadPool threadPool = new ThreadPool(settings); - + final ThreadPool threadPool = new ThreadPool(transportClientSettings); boolean success = false; try { ModulesBuilder modules = new ModulesBuilder(); @@ -447,49 +424,6 @@ public class TransportClient extends AbstractClient { } } - private static class RetryListener implements ActionListener { - private final ESLogger logger = ESLoggerFactory.getLogger(RetryListener.class.getName()); - private final NodeListenerCallback callback; - private final ActionListener listener; - private final List nodes; - private final int index; - - private volatile int n; - - RetryListener(NodeListenerCallback callback, ActionListener listener, - List nodes, int index) { - this.callback = callback; - this.listener = listener; - this.nodes = nodes; - this.index = index; - } - - @Override - public void onResponse(Response response) { - listener.onResponse(response); - } - - @Override - public void onFailure(Throwable e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) { - int n = ++this.n; - if (n >= nodes.size()) { - listener.onFailure(new NoNodeAvailableException("none of the configured nodes were available: " - + nodes, e)); - } else { - try { - logger.warn("retrying on another node (n={}, nodes={})", n, nodes.size()); - callback.doWithNode(nodes.get((index + n) % nodes.size()), this); - } catch (final Throwable t) { - listener.onFailure(t); - } - } - } else { - listener.onFailure(e); - } - } - } - /** * The {@link ProxyActionMap} must be declared public. */ diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml deleted file mode 100644 index f71aced..0000000 --- a/src/test/resources/log4j2.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file