sonarqube fixes

This commit is contained in:
Jörg Prante 2016-11-01 19:28:58 +01:00
parent 7069e31fe0
commit 69bd4d9064
20 changed files with 218 additions and 284 deletions

View file

@ -1,5 +1,8 @@
package org.xbib.elasticsearch; package org.xbib.elasticsearch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
@ -21,9 +24,6 @@ import java.util.TreeSet;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* *
*/ */

View file

@ -1,5 +1,7 @@
package org.xbib.elasticsearch; package org.xbib.elasticsearch;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@ -27,21 +29,27 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
/** /**
* *
*/ */
public class NodeTestUtils { public class NodeTestUtils {
protected static final ESLogger logger = ESLoggerFactory.getLogger("test"); private static final ESLogger logger = ESLoggerFactory.getLogger("test");
private static Random random = new Random(); private static Random random = new Random();
private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>(); private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>(); private Map<String, AbstractClient> clients = new HashMap<>();
private AtomicInteger counter = new AtomicInteger(); private AtomicInteger counter = new AtomicInteger();
private String cluster; private String cluster;
private String host; private String host;
private int port; private int port;
private static void deleteFiles() throws IOException { private static void deleteFiles() throws IOException {
@ -72,13 +80,14 @@ public class NodeTestUtils {
findNodeAddress(); findNodeAddress();
try { try {
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE, ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN).timeout(TimeValue.timeValueSeconds(30))).actionGet(); new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) { if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name() throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!"); + ", from here on, everything will fail!");
} }
} catch (ElasticsearchTimeoutException e) { } catch (ElasticsearchTimeoutException e) {
throw new IOException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations"); throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
} }
} catch (Throwable t) { } catch (Throwable t) {
logger.error("startNodes failed", t); logger.error("startNodes failed", t);
@ -95,7 +104,7 @@ public class NodeTestUtils {
try { try {
deleteFiles(); deleteFiles();
logger.info("data files wiped"); logger.info("data files wiped");
Thread.sleep(2000L); Thread.sleep(2000L); // let OS commit changes
} catch (IOException e) { } catch (IOException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View file

@ -1,24 +1,28 @@
package org.xbib.elasticsearch; package org.xbib.elasticsearch;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.client.Requests.refreshRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.junit.Test; import org.junit.Test;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.client.Requests.refreshRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/** /**
* *
*/ */
public class SearchTest extends NodeTestUtils { public class SearchTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger("test");
@Test @Test
public void testSearch() throws Exception { public void testSearch() throws Exception {
Client client = client("1"); Client client = client("1");

View file

@ -1,5 +1,10 @@
package org.xbib.elasticsearch; package org.xbib.elasticsearch;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.junit.Assert.assertEquals;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
@ -7,11 +12,6 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.junit.Test; import org.junit.Test;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.junit.Assert.assertEquals;
/** /**
* *
*/ */

View file

@ -1,5 +1,10 @@
package org.xbib.elasticsearch; package org.xbib.elasticsearch;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
@ -7,11 +12,6 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
/** /**
* *
*/ */
@ -66,5 +66,4 @@ public class WildcardTest extends NodeTestUtils {
throw new RuntimeException("actualHits=" + actualHits + ", expectedHits=" + expectedHits); throw new RuntimeException("actualHits=" + actualHits + ", expectedHits=" + expectedHits);
} }
} }
} }

View file

@ -1,5 +1,9 @@
package org.xbib.elasticsearch.extras.client.node; package org.xbib.elasticsearch.extras.client.node;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
@ -24,10 +28,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/** /**
* *
*/ */

View file

@ -3,6 +3,8 @@ package org.xbib.elasticsearch.extras.client.node;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.Before; import org.junit.Before;
@ -16,6 +18,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
*/ */
public class BulkNodeClusterBlockTest extends NodeTestUtils { public class BulkNodeClusterBlockTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger("test");
@Before @Before
public void startNodes() { public void startNodes() {
try { try {

View file

@ -15,13 +15,16 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/**
*
*/
public class BulkNodeDuplicateIDTest extends NodeTestUtils { public class BulkNodeDuplicateIDTest extends NodeTestUtils {
private final static ESLogger logger = ESLoggerFactory.getLogger(BulkNodeDuplicateIDTest.class.getSimpleName()); private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeDuplicateIDTest.class.getSimpleName());
private final static Long MAX_ACTIONS = 1000L; private static final Long MAX_ACTIONS = 1000L;
private final static Long NUM_ACTIONS = 12345L; private static final Long NUM_ACTIONS = 12345L;
@Test @Test
public void testDuplicateDocIDs() throws Exception { public void testDuplicateDocIDs() throws Exception {

View file

@ -6,7 +6,7 @@
</Console> </Console>
</appenders> </appenders>
<Loggers> <Loggers>
<Root level="info"> <Root level="debug">
<AppenderRef ref="Console" /> <AppenderRef ref="Console" />
</Root> </Root>
</Loggers> </Loggers>

View file

@ -87,7 +87,7 @@ public abstract class AbstractClient {
} }
public void resetSettings() { public void resetSettings() {
settingsBuilder = Settings.settingsBuilder(); this.settingsBuilder = Settings.settingsBuilder();
settings = null; settings = null;
mappings = new HashMap<>(); mappings = new HashMap<>();
} }
@ -166,10 +166,10 @@ public abstract class AbstractClient {
if (value == null) { if (value == null) {
throw new IOException("no value given"); throw new IOException("no value given");
} }
Settings.Builder settingsBuilder = Settings.settingsBuilder(); Settings.Builder updateSettingsBuilder = Settings.settingsBuilder();
settingsBuilder.put(key, value.toString()); updateSettingsBuilder.put(key, value.toString());
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
.settings(settingsBuilder); .settings(updateSettingsBuilder);
client().execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); client().execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
} }
@ -194,8 +194,7 @@ public abstract class AbstractClient {
return shards; return shards;
} }
public void waitForCluster(String statusString, TimeValue timeout) public void waitForCluster(String statusString, TimeValue timeout) throws IOException {
throws IOException, ElasticsearchTimeoutException {
if (client() == null) { if (client() == null) {
return; return;
} }
@ -222,11 +221,14 @@ public abstract class AbstractClient {
int nodeCount = clusterStateResponse.getState().getNodes().size(); int nodeCount = clusterStateResponse.getState().getNodes().size();
return name + " (" + nodeCount + " nodes connected)"; return name + " (" + nodeCount + " nodes connected)";
} catch (ElasticsearchTimeoutException e) { } catch (ElasticsearchTimeoutException e) {
logger.warn(e.getMessage(), e);
return "TIMEOUT"; return "TIMEOUT";
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn(e.getMessage(), e);
return "DISCONNECTED"; return "DISCONNECTED";
} catch (Throwable t) { } catch (Exception e) {
return "[" + t.getMessage() + "]"; logger.warn(e.getMessage(), e);
return "[" + e.getMessage() + "]";
} }
} }
@ -241,11 +243,14 @@ public abstract class AbstractClient {
ClusterHealthStatus status = healthResponse.getStatus(); ClusterHealthStatus status = healthResponse.getStatus();
return status.name(); return status.name();
} catch (ElasticsearchTimeoutException e) { } catch (ElasticsearchTimeoutException e) {
logger.warn(e.getMessage(), e);
return "TIMEOUT"; return "TIMEOUT";
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn(e.getMessage(), e);
return "DISCONNECTED"; return "DISCONNECTED";
} catch (Throwable t) { } catch (Exception e) {
return "[" + t.getMessage() + "]"; logger.warn(e.getMessage(), e);
return "[" + e.getMessage() + "]";
} }
} }
@ -310,10 +315,8 @@ public abstract class AbstractClient {
Set<String> indices = new TreeSet<>(Collections.reverseOrder()); Set<String> indices = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) { for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
Matcher m = pattern.matcher(indexName.value); Matcher m = pattern.matcher(indexName.value);
if (m.matches()) { if (m.matches() && alias.equals(m.group(1))) {
if (alias.equals(m.group(1))) { indices.add(indexName.value);
indices.add(indexName.value);
}
} }
} }
return indices.isEmpty() ? alias : indices.iterator().next(); return indices.isEmpty() ? alias : indices.iterator().next();
@ -424,10 +427,8 @@ public abstract class AbstractClient {
logger.info("{} indices", getIndexResponse.getIndices().length); logger.info("{} indices", getIndexResponse.getIndices().length);
for (String s : getIndexResponse.getIndices()) { for (String s : getIndexResponse.getIndices()) {
Matcher m = pattern.matcher(s); Matcher m = pattern.matcher(s);
if (m.matches()) { if (m.matches() && index.equals(m.group(1)) && !s.equals(concreteIndex)) {
if (index.equals(m.group(1)) && !s.equals(concreteIndex)) { indices.add(s);
indices.add(s);
}
} }
} }
if (indices.isEmpty()) { if (indices.isEmpty()) {
@ -470,21 +471,21 @@ public abstract class AbstractClient {
} }
} }
public Long mostRecentDocument(String index) { public Long mostRecentDocument(String index, String timestampfieldname) {
if (client() == null) { if (client() == null) {
return null; return null;
} }
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client(), SearchAction.INSTANCE); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client(), SearchAction.INSTANCE);
SortBuilder sort = SortBuilders.fieldSort("_timestamp").order(SortOrder.DESC); SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SearchResponse searchResponse = searchRequestBuilder.setIndices(index) SearchResponse searchResponse = searchRequestBuilder.setIndices(index)
.addField("_timestamp") .addField(timestampfieldname)
.setSize(1) .setSize(1)
.addSort(sort) .addSort(sort)
.execute().actionGet(); .execute().actionGet();
if (searchResponse.getHits().getHits().length == 1) { if (searchResponse.getHits().getHits().length == 1) {
SearchHit hit = searchResponse.getHits().getHits()[0]; SearchHit hit = searchResponse.getHits().getHits()[0];
if (hit.getFields().get("_timestamp") != null) { if (hit.getFields().get(timestampfieldname) != null) {
return hit.getFields().get("_timestamp").getValue(); return hit.getFields().get(timestampfieldname).getValue();
} else { } else {
return 0L; return 0L;
} }

View file

@ -197,11 +197,10 @@ public class BulkProcessor implements Closeable {
} }
private void execute() { private void execute() {
final BulkRequest bulkRequest = this.bulkRequest; final BulkRequest myBulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet(); final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = new BulkRequest(); this.bulkRequest = new BulkRequest();
this.bulkRequestHandler.execute(bulkRequest, executionId); this.bulkRequestHandler.execute(myBulkRequest, executionId);
} }
private boolean isOverTheLimit() { private boolean isOverTheLimit() {
@ -372,15 +371,15 @@ public class BulkProcessor implements Closeable {
/** /**
* Abstracts the low-level details of bulk request handling. * Abstracts the low-level details of bulk request handling.
*/ */
abstract class BulkRequestHandler { interface BulkRequestHandler {
public abstract void execute(BulkRequest bulkRequest, long executionId); void execute(BulkRequest bulkRequest, long executionId);
public abstract boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
} }
private class SyncBulkRequestHandler extends BulkRequestHandler { private class SyncBulkRequestHandler implements BulkRequestHandler {
private final Client client; private final Client client;
private final BulkProcessor.Listener listener; private final BulkProcessor.Listener listener;
@ -389,6 +388,7 @@ public class BulkProcessor implements Closeable {
this.listener = listener; this.listener = listener;
} }
@Override
public void execute(BulkRequest bulkRequest, long executionId) { public void execute(BulkRequest bulkRequest, long executionId) {
boolean afterCalled = false; boolean afterCalled = false;
try { try {
@ -396,19 +396,20 @@ public class BulkProcessor implements Closeable {
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet(); BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
afterCalled = true; afterCalled = true;
listener.afterBulk(executionId, bulkRequest, bulkResponse); listener.afterBulk(executionId, bulkRequest, bulkResponse);
} catch (Throwable t) { } catch (Exception e) {
if (!afterCalled) { if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, t); listener.afterBulk(executionId, bulkRequest, e);
} }
} }
} }
@Override
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
return true; return true;
} }
} }
private class AsyncBulkRequestHandler extends BulkRequestHandler { private class AsyncBulkRequestHandler implements BulkRequestHandler {
private final Client client; private final Client client;
private final BulkProcessor.Listener listener; private final BulkProcessor.Listener listener;
private final Semaphore semaphore; private final Semaphore semaphore;
@ -452,8 +453,8 @@ public class BulkProcessor implements Closeable {
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
listener.afterBulk(executionId, bulkRequest, e); listener.afterBulk(executionId, bulkRequest, e);
} catch (Throwable t) { } catch (Exception e) {
listener.afterBulk(executionId, bulkRequest, t); listener.afterBulk(executionId, bulkRequest, e);
} finally { } finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
semaphore.release(); semaphore.release();

View file

@ -355,13 +355,14 @@ public interface ClientMethods extends Parameters {
void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep); void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep);
/** /**
* Log the timestamp of the most recently indexed document in the index. * Find the timestamp of the most recently indexed document in the index.
* *
* @param index the index name * @param index the index name
* @param timestampfieldname the timestamp field name
* @return millis UTC millis of the most recent document * @return millis UTC millis of the most recent document
* @throws IOException if most rcent document can not be found * @throws IOException if most rcent document can not be found
*/ */
Long mostRecentDocument(String index) throws IOException; Long mostRecentDocument(String index, String timestampfieldname) throws IOException;
/** /**
* Get metric. * Get metric.

View file

@ -5,6 +5,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder
/** /**
* *
*/ */
@FunctionalInterface
public interface IndexAliasAdder { public interface IndexAliasAdder {
void addIndexAlias(IndicesAliasesRequestBuilder builder, String index, String alias); void addIndexAlias(IndicesAliasesRequestBuilder builder, String index, String alias);

View file

@ -1,5 +1,8 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elasticsearch.extras.client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.Inet6Address; import java.net.Inet6Address;
@ -8,7 +11,6 @@ import java.net.NetworkInterface;
import java.net.SocketException; import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -18,27 +20,30 @@ import java.util.Locale;
*/ */
public class NetworkUtils { public class NetworkUtils {
private static final String IPv4_SETTING = "java.net.preferIPv4Stack"; private static final ESLogger logger = ESLoggerFactory.getLogger(NetworkUtils.class.getName());
private static final String IPv6_SETTING = "java.net.preferIPv6Addresses"; private static final String IPV4_SETTING = "java.net.preferIPv4Stack";
private static final InetAddress localAddress; private static final String IPV6_SETTING = "java.net.preferIPv6Addresses";
private static final InetAddress LOCAL_ADDRESS;
static { static {
InetAddress address; InetAddress address;
try { try {
address = InetAddress.getLocalHost(); address = InetAddress.getLocalHost();
} catch (Throwable e) { } catch (Exception e) {
logger.warn(e.getMessage(), e);
address = InetAddress.getLoopbackAddress(); address = InetAddress.getLoopbackAddress();
} }
localAddress = address; LOCAL_ADDRESS = address;
} }
private NetworkUtils() { private NetworkUtils() {
} }
public static InetAddress getLocalAddress() { public static InetAddress getLocalAddress() {
return localAddress; return LOCAL_ADDRESS;
} }
public static InetAddress getFirstNonLoopbackAddress(ProtocolVersion ipversion) throws SocketException { public static InetAddress getFirstNonLoopbackAddress(ProtocolVersion ipversion) throws SocketException {
@ -49,6 +54,7 @@ public class NetworkUtils {
continue; continue;
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e);
continue; continue;
} }
address = getFirstNonLoopbackAddress(networkInterface, ipversion); address = getFirstNonLoopbackAddress(networkInterface, ipversion);
@ -66,11 +72,9 @@ public class NetworkUtils {
} }
for (Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) { for (Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) {
InetAddress address = addresses.nextElement(); InetAddress address = addresses.nextElement();
if (!address.isLoopbackAddress()) { if (!address.isLoopbackAddress() && (address instanceof Inet4Address && ipVersion == ProtocolVersion.IPV4) ||
if ((address instanceof Inet4Address && ipVersion == ProtocolVersion.IPv4) || (address instanceof Inet6Address && ipVersion == ProtocolVersion.IPV6)) {
(address instanceof Inet6Address && ipVersion == ProtocolVersion.IPv6)) { return address;
return address;
}
} }
} }
return null; return null;
@ -83,8 +87,8 @@ public class NetworkUtils {
} }
for (Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) { for (Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) {
InetAddress address = addresses.nextElement(); InetAddress address = addresses.nextElement();
if ((address instanceof Inet4Address && ipVersion == ProtocolVersion.IPv4) || if ((address instanceof Inet4Address && ipVersion == ProtocolVersion.IPV4) ||
(address instanceof Inet6Address && ipVersion == ProtocolVersion.IPv6)) { (address instanceof Inet6Address && ipVersion == ProtocolVersion.IPV6)) {
return address; return address;
} }
} }
@ -122,18 +126,20 @@ public class NetworkUtils {
public static ProtocolVersion getProtocolVersion() throws SocketException { public static ProtocolVersion getProtocolVersion() throws SocketException {
switch (findAvailableProtocols()) { switch (findAvailableProtocols()) {
case IPv4: case IPV4:
return ProtocolVersion.IPv4; return ProtocolVersion.IPV4;
case IPv6: case IPV6:
return ProtocolVersion.IPv6; return ProtocolVersion.IPV6;
case IPv46: case IPV46:
if (Boolean.getBoolean(System.getProperty(IPv4_SETTING))) { if (Boolean.getBoolean(System.getProperty(IPV4_SETTING))) {
return ProtocolVersion.IPv4; return ProtocolVersion.IPV4;
} }
if (Boolean.getBoolean(System.getProperty(IPv6_SETTING))) { if (Boolean.getBoolean(System.getProperty(IPV6_SETTING))) {
return ProtocolVersion.IPv6; return ProtocolVersion.IPV6;
} }
return ProtocolVersion.IPv6; return ProtocolVersion.IPV6;
default:
break;
} }
return ProtocolVersion.NONE; return ProtocolVersion.NONE;
} }
@ -150,18 +156,19 @@ public class NetworkUtils {
} }
} }
if (hasIPv4 && hasIPv6) { if (hasIPv4 && hasIPv6) {
return ProtocolVersion.IPv46; return ProtocolVersion.IPV46;
} }
if (hasIPv4) { if (hasIPv4) {
return ProtocolVersion.IPv4; return ProtocolVersion.IPV4;
} }
if (hasIPv6) { if (hasIPv6) {
return ProtocolVersion.IPv6; return ProtocolVersion.IPV6;
} }
return ProtocolVersion.NONE; return ProtocolVersion.NONE;
} }
public static InetAddress resolveInetAddress(String host, String defaultValue) throws IOException { public static InetAddress resolveInetAddress(String hostname, String defaultValue) throws IOException {
String host = hostname;
if (host == null) { if (host == null) {
host = defaultValue; host = defaultValue;
} }
@ -172,23 +179,23 @@ public class NetworkUtils {
} }
if ((host.startsWith("#") && host.endsWith("#")) || (host.startsWith("_") && host.endsWith("_"))) { if ((host.startsWith("#") && host.endsWith("#")) || (host.startsWith("_") && host.endsWith("_"))) {
host = host.substring(1, host.length() - 1); host = host.substring(1, host.length() - 1);
if (host.equals("local")) { if ("local".equals(host)) {
return getLocalAddress(); return getLocalAddress();
} else if (host.startsWith("non_loopback")) { } else if (host.startsWith("non_loopback")) {
if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) { if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) {
return getFirstNonLoopbackAddress(ProtocolVersion.IPv4); return getFirstNonLoopbackAddress(ProtocolVersion.IPV4);
} else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) { } else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) {
return getFirstNonLoopbackAddress(ProtocolVersion.IPv6); return getFirstNonLoopbackAddress(ProtocolVersion.IPV6);
} else { } else {
return getFirstNonLoopbackAddress(getProtocolVersion()); return getFirstNonLoopbackAddress(getProtocolVersion());
} }
} else { } else {
ProtocolVersion protocolVersion = getProtocolVersion(); ProtocolVersion protocolVersion = getProtocolVersion();
if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) { if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) {
protocolVersion = ProtocolVersion.IPv4; protocolVersion = ProtocolVersion.IPV4;
host = host.substring(0, host.length() - 5); host = host.substring(0, host.length() - 5);
} else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) { } else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) {
protocolVersion = ProtocolVersion.IPv6; protocolVersion = ProtocolVersion.IPV6;
host = host.substring(0, host.length() - 5); host = host.substring(0, host.length() - 5);
} }
for (NetworkInterface ni : getAllAvailableInterfaces()) { for (NetworkInterface ni : getAllAvailableInterfaces()) {
@ -227,27 +234,17 @@ public class NetworkUtils {
} }
private static void sortInterfaces(List<NetworkInterface> interfaces) { private static void sortInterfaces(List<NetworkInterface> interfaces) {
Collections.sort(interfaces, new Comparator<NetworkInterface>() { Collections.sort(interfaces, (o1, o2) -> Integer.compare(o1.getIndex(), o2.getIndex()));
@Override
public int compare(NetworkInterface o1, NetworkInterface o2) {
return Integer.compare(o1.getIndex(), o2.getIndex());
}
});
} }
private static void sortAddresses(List<InetAddress> addressList) { private static void sortAddresses(List<InetAddress> addressList) {
Collections.sort(addressList, new Comparator<InetAddress>() { Collections.sort(addressList, (o1, o2) -> compareBytes(o1.getAddress(), o2.getAddress()));
@Override
public int compare(InetAddress o1, InetAddress o2) {
return compareBytes(o1.getAddress(), o2.getAddress());
}
});
} }
private static int compareBytes(byte[] left, byte[] right) { private static int compareBytes(byte[] left, byte[] right) {
for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
int a = (left[i] & 0xff); int a = left[i] & 0xff;
int b = (right[j] & 0xff); int b = right[j] & 0xff;
if (a != b) { if (a != b) {
return a - b; return a - b;
} }
@ -259,6 +256,6 @@ public class NetworkUtils {
* *
*/ */
public enum ProtocolVersion { public enum ProtocolVersion {
IPv4, IPv6, IPv46, NONE IPV4, IPV6, IPV46, NONE
} }
} }

View file

@ -65,7 +65,7 @@ public class SimpleBulkMetric implements BulkMetric {
@Override @Override
public void start() { public void start() {
this.started = System.nanoTime(); this.started = System.nanoTime();
this.totalIngest.spawn(5L); totalIngest.spawn(5L);
} }
@Override @Override

View file

@ -54,6 +54,8 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL; private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL;
private Node node;
private ElasticsearchClient client; private ElasticsearchClient client;
private BulkProcessor bulkProcessor; private BulkProcessor bulkProcessor;
@ -66,9 +68,6 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
private boolean closed; private boolean closed;
public BulkNodeClient() {
}
@Override @Override
public BulkNodeClient maxActionsPerRequest(int maxActionsPerRequest) { public BulkNodeClient maxActionsPerRequest(int maxActionsPerRequest) {
this.maxActionsPerRequest = maxActionsPerRequest; this.maxActionsPerRequest = maxActionsPerRequest;
@ -196,12 +195,12 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
} }
@Override @Override
protected void createClient(Settings settings) throws IOException { protected synchronized void createClient(Settings settings) throws IOException {
if (client != null) { if (client != null) {
logger.warn("client is open, closing..."); logger.warn("client is open, closing...");
client.threadPool().shutdown(); client.threadPool().shutdown();
logger.warn("client is closed");
client = null; client = null;
node.close();
} }
if (settings != null) { if (settings != null) {
String version = System.getProperty("os.name") String version = System.getProperty("os.name")
@ -216,7 +215,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
logger.info("creating node client on {} with effective settings {}", logger.info("creating node client on {} with effective settings {}",
version, effectiveSettings.getAsMap()); version, effectiveSettings.getAsMap());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList(); Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
Node node = new BulkNode(new Environment(effectiveSettings), plugins); this.node = new BulkNode(new Environment(effectiveSettings), plugins);
node.start(); node.start();
this.client = node.client(); this.client = node.client();
} }
@ -230,7 +229,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient index(String index, String type, String id, String source) { public BulkNodeClient index(String index, String type, String id, String source) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
if (metric != null) { if (metric != null) {
@ -248,7 +247,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient bulkIndex(IndexRequest indexRequest) { public BulkNodeClient bulkIndex(IndexRequest indexRequest) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
if (metric != null) { if (metric != null) {
@ -266,7 +265,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient delete(String index, String type, String id) { public BulkNodeClient delete(String index, String type, String id) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
if (metric != null) { if (metric != null) {
@ -284,7 +283,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient bulkDelete(DeleteRequest deleteRequest) { public BulkNodeClient bulkDelete(DeleteRequest deleteRequest) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
if (metric != null) { if (metric != null) {
@ -302,7 +301,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient update(String index, String type, String id, String source) { public BulkNodeClient update(String index, String type, String id, String source) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
if (metric != null) { if (metric != null) {
@ -320,7 +319,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient bulkUpdate(UpdateRequest updateRequest) { public BulkNodeClient bulkUpdate(UpdateRequest updateRequest) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
if (metric != null) { if (metric != null) {
@ -338,7 +337,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient flushIngest() { public BulkNodeClient flushIngest() {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
logger.debug("flushing bulk processor"); logger.debug("flushing bulk processor");
bulkProcessor.flush(); bulkProcessor.flush();
@ -348,7 +347,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException { public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
while (!bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS)) { while (!bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS)) {
logger.warn("still waiting for responses"); logger.warn("still waiting for responses");
@ -395,6 +394,10 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
} }
metric.stop(); metric.stop();
} }
if (node != null) {
logger.debug("closing node...");
node.close();
}
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
@ -416,7 +419,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient newIndex(String index, Settings settings, Map<String, String> mappings) { public BulkNodeClient newIndex(String index, Settings settings, Map<String, String> mappings) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
if (client == null) { if (client == null) {
logger.warn("no client for create index"); logger.warn("no client for create index");
@ -433,9 +436,11 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
createIndexRequestBuilder.setSettings(settings); createIndexRequestBuilder.setSettings(settings);
} }
if (mappings != null) { if (mappings != null) {
for (String type : mappings.keySet()) { for (Map.Entry<String, String> entry : mappings.entrySet()) {
String type = entry.getKey();
String mapping = entry.getValue();
logger.info("found mapping for {}", type); logger.info("found mapping for {}", type);
createIndexRequestBuilder.addMapping(type, mappings.get(type)); createIndexRequestBuilder.addMapping(type, mapping);
} }
} }
createIndexRequestBuilder.execute().actionGet(); createIndexRequestBuilder.execute().actionGet();
@ -458,7 +463,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
@Override @Override
public BulkNodeClient deleteIndex(String index) { public BulkNodeClient deleteIndex(String index) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
if (client == null) { if (client == null) {
logger.warn("no client"); logger.warn("no client");
@ -488,10 +493,15 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
return settings(); return settings();
} }
@Override
public Settings.Builder getSettingsBuilder() { public Settings.Builder getSettingsBuilder() {
return settingsBuilder(); return settingsBuilder();
} }
private static void throwClose() {
throw new ElasticsearchException("client is closed");
}
private class BulkNode extends Node { private class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) { BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {

View file

@ -28,9 +28,9 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elasticsearch.extras.client.AbstractClient; import org.xbib.elasticsearch.extras.client.AbstractClient;
import org.xbib.elasticsearch.extras.client.BulkProcessor;
import org.xbib.elasticsearch.extras.client.BulkMetric;
import org.xbib.elasticsearch.extras.client.BulkControl; import org.xbib.elasticsearch.extras.client.BulkControl;
import org.xbib.elasticsearch.extras.client.BulkMetric;
import org.xbib.elasticsearch.extras.client.BulkProcessor;
import org.xbib.elasticsearch.extras.client.ClientMethods; import org.xbib.elasticsearch.extras.client.ClientMethods;
import org.xbib.elasticsearch.extras.client.NetworkUtils; import org.xbib.elasticsearch.extras.client.NetworkUtils;
@ -75,9 +75,6 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
private boolean isShutdown; private boolean isShutdown;
public BulkTransportClient() {
}
@Override @Override
public BulkTransportClient init(ElasticsearchClient client, BulkMetric metric, BulkControl control) throws IOException { public BulkTransportClient init(ElasticsearchClient client, BulkMetric metric, BulkControl control) throws IOException {
return init(findSettings(), metric, control); return init(findSettings(), metric, control);
@ -198,7 +195,6 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
logger.warn("client is open, closing..."); logger.warn("client is open, closing...");
client.close(); client.close();
client.threadPool().shutdown(); client.threadPool().shutdown();
logger.warn("client is closed");
client = null; client = null;
} }
if (settings != null) { if (settings != null) {
@ -257,7 +253,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public ClientMethods newIndex(String index) { public ClientMethods newIndex(String index) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
return newIndex(index, null, null); return newIndex(index, null, null);
} }
@ -273,11 +269,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public ClientMethods newIndex(String index, Settings settings, Map<String, String> mappings) { public ClientMethods newIndex(String index, Settings settings, Map<String, String> mappings) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
}
if (client == null) {
logger.warn("no client for create index");
return this;
} }
if (index == null) { if (index == null) {
logger.warn("no index name given to create index"); logger.warn("no index name given to create index");
@ -290,9 +282,11 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
createIndexRequestBuilder.setSettings(settings); createIndexRequestBuilder.setSettings(settings);
} }
if (mappings != null) { if (mappings != null) {
for (String type : mappings.keySet()) { for (Map.Entry<String, String> entry : mappings.entrySet()) {
String type = entry.getKey();
String mapping = entry.getValue();
logger.info("found mapping for {}", type); logger.info("found mapping for {}", type);
createIndexRequestBuilder.addMapping(type, mappings.get(type)); createIndexRequestBuilder.addMapping(type, mapping);
} }
} }
createIndexRequestBuilder.execute().actionGet(); createIndexRequestBuilder.execute().actionGet();
@ -303,11 +297,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public ClientMethods deleteIndex(String index) { public ClientMethods deleteIndex(String index) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
}
if (client == null) {
logger.warn("no client for delete index");
return this;
} }
if (index == null) { if (index == null) {
logger.warn("no index name given to delete index"); logger.warn("no index name given to delete index");
@ -345,7 +335,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public BulkTransportClient index(String index, String type, String id, String source) { public BulkTransportClient index(String index, String type, String id, String source) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
@ -361,7 +351,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public BulkTransportClient bulkIndex(IndexRequest indexRequest) { public BulkTransportClient bulkIndex(IndexRequest indexRequest) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
@ -377,7 +367,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public BulkTransportClient delete(String index, String type, String id) { public BulkTransportClient delete(String index, String type, String id) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
@ -393,7 +383,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public BulkTransportClient bulkDelete(DeleteRequest deleteRequest) { public BulkTransportClient bulkDelete(DeleteRequest deleteRequest) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
@ -409,7 +399,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public BulkTransportClient update(String index, String type, String id, String source) { public BulkTransportClient update(String index, String type, String id, String source) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
@ -425,7 +415,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public BulkTransportClient bulkUpdate(UpdateRequest updateRequest) { public BulkTransportClient bulkUpdate(UpdateRequest updateRequest) {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
} }
try { try {
metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
@ -441,11 +431,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
@Override @Override
public synchronized BulkTransportClient flushIngest() { public synchronized BulkTransportClient flushIngest() {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
}
if (client == null) {
logger.warn("no client");
return this;
} }
logger.debug("flushing bulk processor"); logger.debug("flushing bulk processor");
bulkProcessor.flush(); bulkProcessor.flush();
@ -456,11 +442,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
if (closed) { if (closed) {
throw new ElasticsearchException("client is closed"); throwClose();
}
if (client == null) {
logger.warn("no client");
return this;
} }
bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
return this; return this;
@ -470,11 +452,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
public synchronized void shutdown() { public synchronized void shutdown() {
if (closed) { if (closed) {
shutdownClient(); shutdownClient();
throw new ElasticsearchException("client is closed"); throwClose();
}
if (client == null) {
logger.warn("no client");
return;
} }
try { try {
if (bulkProcessor != null) { if (bulkProcessor != null) {
@ -532,7 +510,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
try { try {
port = Integer.parseInt(splitHost[1]); port = Integer.parseInt(splitHost[1]);
} catch (Exception e) { } catch (Exception e) {
// ignore logger.warn(e.getMessage(), e);
} }
addresses.add(new InetSocketTransportAddress(inetAddress, port)); addresses.add(new InetSocketTransportAddress(inetAddress, port));
} }
@ -545,6 +523,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
return addresses; return addresses;
} }
private static void throwClose() {
throw new ElasticsearchException("client is closed");
}
private void shutdownClient() { private void shutdownClient() {
if (client != null) { if (client != null) {
logger.debug("shutdown started"); logger.debug("shutdown started");

View file

@ -19,9 +19,6 @@ import java.util.Map;
*/ */
public class MockTransportClient extends BulkTransportClient { public class MockTransportClient extends BulkTransportClient {
public MockTransportClient() {
}
@Override @Override
public ElasticsearchClient client() { public ElasticsearchClient client() {
return null; return null;
@ -124,18 +121,22 @@ public class MockTransportClient extends BulkTransportClient {
@Override @Override
public void putMapping(String index) { public void putMapping(String index) {
// mockup method
} }
@Override @Override
public void refreshIndex(String index) { public void refreshIndex(String index) {
// mockup method
} }
@Override @Override
public void flushIndex(String index) { public void flushIndex(String index) {
// mockup method
} }
@Override @Override
public void waitForCluster(String healthColor, TimeValue timeValue) throws IOException { public void waitForCluster(String healthColor, TimeValue timeValue) throws IOException {
// mockup method
} }
@Override @Override
@ -150,7 +151,7 @@ public class MockTransportClient extends BulkTransportClient {
@Override @Override
public void shutdown() { public void shutdown() {
// do nothing // mockup method
} }
} }

View file

@ -4,7 +4,6 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -32,9 +31,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.settings.SettingsModule;
@ -49,7 +45,6 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchModule;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler; import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
@ -67,9 +62,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Stripped-down transport client without node sampling. * Stripped-down transport client without node sampling and without retrying.
*
* Merged together: original TransportClient, TransportClientNodesServce, TransportClientProxy * Merged together: original TransportClient, TransportClientNodesServce, TransportClientProxy
* Configurable ping interval setting added
* Configurable connect ping interval setting added.
*/ */
public class TransportClient extends AbstractClient { public class TransportClient extends AbstractClient {
@ -260,7 +257,7 @@ public class TransportClient extends AbstractClient {
try { try {
injector.getInstance(MonitorService.class).close(); injector.getInstance(MonitorService.class).close();
} catch (Exception e) { } catch (Exception e) {
// ignore, might not be bounded logger.debug(e.getMessage(), e);
} }
for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).nodeServices()) { for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).nodeServices()) {
injector.getInstance(plugin).close(); injector.getInstance(plugin).close();
@ -268,7 +265,7 @@ public class TransportClient extends AbstractClient {
try { try {
ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS); ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
// ignore logger.debug(e.getMessage(), e);
} }
injector.getInstance(PageCacheRecycler.class).close(); injector.getInstance(PageCacheRecycler.class).close();
} }
@ -281,7 +278,7 @@ public class TransportClient extends AbstractClient {
try { try {
logger.trace("connecting to listed node (light) [{}]", listedNode); logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode); transportService.connectToNodeLight(listedNode);
} catch (Throwable e) { } catch (Exception e) {
logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode); logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode);
continue; continue;
} }
@ -310,7 +307,7 @@ public class TransportClient extends AbstractClient {
listedNode); listedNode);
newNodes.add(listedNode); newNodes.add(listedNode);
} }
} catch (Throwable e) { } catch (Exception e) {
logger.info("failed to get node info for {}, disconnecting...", e, listedNode); logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode); transportService.disconnectFromNode(listedNode);
} }
@ -321,7 +318,7 @@ public class TransportClient extends AbstractClient {
try { try {
logger.trace("connecting to node [{}]", node); logger.trace("connecting to node [{}]", node);
transportService.connectToNode(node); transportService.connectToNode(node);
} catch (Throwable e) { } catch (Exception e) {
it.remove(); it.remove();
logger.debug("failed to connect to discovered node [" + node + "]", e); logger.debug("failed to connect to discovered node [" + node + "]", e);
} }
@ -333,22 +330,14 @@ public class TransportClient extends AbstractClient {
@Override @Override
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
protected <Request extends ActionRequest, Response extends ActionResponse, protected <R extends ActionRequest, S extends ActionResponse, T extends ActionRequestBuilder<R, S, T>>
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<R, S, T> action, final R request, final ActionListener<S> listener) {
void doExecute(Action<Request, Response, RequestBuilder> action, final Request request, final TransportActionNodeProxy<R, S> proxyAction = proxyActionMap.getProxies().get(action);
ActionListener<Response> listener) {
final TransportActionNodeProxy<Request, Response> proxyAction = proxyActionMap.getProxies().get(action);
if (proxyAction == null) { if (proxyAction == null) {
throw new IllegalStateException("undefined action " + action); throw new IllegalStateException("undefined action " + action);
} }
NodeListenerCallback<Response> callback = new NodeListenerCallback<Response>() { List<DiscoveryNode> nodeList = this.nodes;
@Override if (nodeList.isEmpty()) {
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxyAction.execute(node, request, listener);
}
};
List<DiscoveryNode> nodes = this.nodes;
if (nodes.isEmpty()) {
throw new NoNodeAvailableException("none of the configured nodes are available: " + this.listedNodes); throw new NoNodeAvailableException("none of the configured nodes are available: " + this.listedNodes);
} }
int index = nodeCounter.incrementAndGet(); int index = nodeCounter.incrementAndGet();
@ -356,24 +345,14 @@ public class TransportClient extends AbstractClient {
index = 0; index = 0;
nodeCounter.set(0); nodeCounter.set(0);
} }
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index); // try once and never more
DiscoveryNode node = nodes.get((index) % nodes.size());
try { try {
callback.doWithNode(node, retryListener); proxyAction.execute(nodeList.get(index % nodeList.size()), request, listener);
} catch (Throwable t) { } catch (Exception e) {
listener.onFailure(t); listener.onFailure(e);
} }
} }
/**
*
* @param <Response>
*/
interface NodeListenerCallback<Response> {
void doWithNode(DiscoveryNode node, ActionListener<Response> listener);
}
/** /**
* *
*/ */
@ -397,19 +376,17 @@ public class TransportClient extends AbstractClient {
} }
public TransportClient build() { public TransportClient build() {
Settings settings = InternalSettingsPreparer.prepareSettings(this.settings); Settings transportClientSettings = settingsBuilder()
settings = settingsBuilder()
.put("transport.ping.schedule", this.settings.get("ping.interval", "30s")) .put("transport.ping.schedule", this.settings.get("ping.interval", "30s"))
.put(settings) .put(InternalSettingsPreparer.prepareSettings(this.settings))
.put("network.server", false) .put("network.server", false)
.put("node.client", true) .put("node.client", true)
.put(CLIENT_TYPE_SETTING, CLIENT_TYPE) .put(CLIENT_TYPE_SETTING, CLIENT_TYPE)
.build(); .build();
PluginsService pluginsService = new PluginsService(settings, null, null, pluginClasses); PluginsService pluginsService = new PluginsService(transportClientSettings, null, null, pluginClasses);
this.settings = pluginsService.updatedSettings(); this.settings = pluginsService.updatedSettings();
Version version = Version.CURRENT; Version version = Version.CURRENT;
final ThreadPool threadPool = new ThreadPool(settings); final ThreadPool threadPool = new ThreadPool(transportClientSettings);
boolean success = false; boolean success = false;
try { try {
ModulesBuilder modules = new ModulesBuilder(); ModulesBuilder modules = new ModulesBuilder();
@ -447,49 +424,6 @@ public class TransportClient extends AbstractClient {
} }
} }
private static class RetryListener<Response> implements ActionListener<Response> {
private final ESLogger logger = ESLoggerFactory.getLogger(RetryListener.class.getName());
private final NodeListenerCallback<Response> callback;
private final ActionListener<Response> listener;
private final List<DiscoveryNode> nodes;
private final int index;
private volatile int n;
RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
List<DiscoveryNode> nodes, int index) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
this.index = index;
}
@Override
public void onResponse(Response response) {
listener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int n = ++this.n;
if (n >= nodes.size()) {
listener.onFailure(new NoNodeAvailableException("none of the configured nodes were available: "
+ nodes, e));
} else {
try {
logger.warn("retrying on another node (n={}, nodes={})", n, nodes.size());
callback.doWithNode(nodes.get((index + n) % nodes.size()), this);
} catch (final Throwable t) {
listener.onFailure(t);
}
}
} else {
listener.onFailure(e);
}
}
}
/** /**
* The {@link ProxyActionMap} must be declared public. * The {@link ProxyActionMap} must be declared public.
*/ */

View file

@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="[%d{ABSOLUTE}][%-5p][%-25c][%t] %m%n"/>
</Console>
</appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</configuration>