align tests, loggers, sonarqube issues

This commit is contained in:
Jörg Prante 2016-11-19 16:43:57 +01:00
parent acbfdb8f4c
commit 1888b3630f
25 changed files with 193 additions and 106 deletions

View file

@ -6,7 +6,7 @@ plugins {
}
group = 'org.xbib'
version = '5.0.1.0'
version = '5.0.1.1'
printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" +
"Build: group: ${project.group} name: ${project.name} version: ${project.version}\n",
@ -56,7 +56,13 @@ configurations {
dependencies {
compile "org.xbib:metrics:1.0.0"
compile "org.elasticsearch.client:transport:5.0.1"
compile("org.elasticsearch.client:transport:5.0.1") {
exclude group: 'org.elasticsearch', module: 'securesm'
exclude group: 'org.elasticsearch.plugin', module: 'transport-netty3-client'
exclude group: 'org.elasticsearch.plugin', module: 'reindex-client'
exclude group: 'org.elasticsearch.plugin', module: 'percolator-client'
exclude group: 'org.elasticsearch.plugin', module: 'lang-mustache-client'
}
compile "org.apache.logging.log4j:log4j-api:2.7"
testCompile "junit:junit:4.12"
testCompile "org.apache.logging.log4j:log4j-core:2.7"

View file

@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class NodeTestBase {
protected static final Logger logger = LogManager.getLogger("test");
private static final Logger logger = LogManager.getLogger("test");
private static final Random random = new Random();

View file

@ -1,4 +1,4 @@
package org.xbib.elasticsearch;
package org.xbib.elasticsearch.extras.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.common.Strings;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestBase;
import java.io.IOException;
import java.util.Collections;
@ -72,12 +73,10 @@ public class AliasTest extends NodeTestBase {
Set<String> result = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
Matcher m = pattern.matcher(indexName.value);
if (m.matches()) {
if (alias.equals(m.group(1))) {
if (m.matches() && alias.equals(m.group(1))) {
result.add(indexName.value);
}
}
}
Iterator<String> it = result.iterator();
assertEquals("test20160103", it.next());
assertEquals("test20160102", it.next());

View file

@ -1,4 +1,6 @@
package org.xbib.elasticsearch.extras.client.node;
package org.xbib.elasticsearch.extras.client;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -11,14 +13,12 @@ import org.junit.Before;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestBase;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
*
*/
public class BulkNodeClusterBlockTest extends NodeTestBase {
public class ClusterBlockTest extends NodeTestBase {
private static final Logger logger = LogManager.getLogger(BulkNodeClusterBlockTest.class.getName());
private static final Logger logger = LogManager.getLogger(ClusterBlockTest.class.getName());
@Before
public void startNodes() {

View file

@ -1,4 +1,4 @@
package org.xbib.elasticsearch;
package org.xbib.elasticsearch.extras.client;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.client.Requests.refreshRequest;
@ -15,6 +15,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestBase;
/**
*

View file

@ -1,4 +1,4 @@
package org.xbib.elasticsearch;
package org.xbib.elasticsearch.extras.client;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
@ -13,6 +13,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestBase;
/**
*
@ -39,7 +40,8 @@ public class SimpleTest extends NodeTestBase {
} catch (Exception e) {
// ignore
}
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client("1"), CreateIndexAction.INSTANCE)
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client("1"),
CreateIndexAction.INSTANCE)
.setIndex("test")
.setSettings(Settings.builder()
.put("index.analysis.analyzer.default.filter.0", "lowercase")

View file

@ -1,4 +1,4 @@
package org.xbib.elasticsearch;
package org.xbib.elasticsearch.extras.client;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -9,6 +9,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestBase;
import java.io.IOException;

View file

@ -6,14 +6,12 @@ import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.ExecutorServices;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -28,7 +26,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@ -68,7 +65,7 @@ public class BulkNodeClientTest extends NodeTestBase {
}
@Test
public void testMappingNodeClient() throws Exception {
public void testBulkNodeClientMapping() throws Exception {
final BulkNodeClient client = ClientBuilder.builder()
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5))
.setMetric(new SimpleBulkMetric())
@ -98,7 +95,7 @@ public class BulkNodeClientTest extends NodeTestBase {
}
@Test
public void testSingleDocNodeClient() {
public void testBulkNodeClientSingleDoc() {
final BulkNodeClient client = ClientBuilder.builder()
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(30))
@ -127,7 +124,7 @@ public class BulkNodeClientTest extends NodeTestBase {
}
@Test
public void testRandomDocsNodeClient() throws Exception {
public void testBulkNodeClientRandomDocs() throws Exception {
long numactions = NUM_ACTIONS;
final BulkNodeClient client = ClientBuilder.builder()
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
@ -155,7 +152,7 @@ public class BulkNodeClientTest extends NodeTestBase {
}
@Test
public void testThreadedRandomDocsNodeClient() throws Exception {
public void testBulkNodeClientThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors();
Long maxactions = MAX_ACTIONS;
final Long maxloop = NUM_ACTIONS;

View file

@ -1,5 +1,10 @@
package org.xbib.elasticsearch.extras.client.node;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
@ -12,9 +17,6 @@ import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
/**
*
*/

View file

@ -1,5 +1,7 @@
package org.xbib.elasticsearch.extras.client.node;
import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
@ -17,8 +19,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertFalse;
/**
*
*/

View file

@ -1,5 +1,9 @@
package org.xbib.elasticsearch.extras.client.node;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
@ -22,16 +26,12 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.Map;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkNodeReplicaTest extends NodeTestBase {
private final static Logger logger = LogManager.getLogger(BulkNodeReplicaTest.class.getName());
private static final Logger logger = LogManager.getLogger(BulkNodeReplicaTest.class.getName());
@Test
public void testReplicaLevel() throws Exception {
@ -80,7 +80,8 @@ public class BulkNodeReplicaTest extends NodeTestBase {
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
logger.info("query total hits={}", hits);
assertEquals(2468, hits);
IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.client(), IndicesStatsAction.INSTANCE)
IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.client(),
IndicesStatsAction.INSTANCE)
.all();
IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet();
for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) {

View file

@ -1,5 +1,8 @@
package org.xbib.elasticsearch.extras.client.node;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -11,9 +14,6 @@ import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
*
*/

View file

@ -1,10 +1,20 @@
package org.xbib.elasticsearch.extras.client.transport;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before;
import org.junit.Test;
@ -12,6 +22,7 @@ import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import org.xbib.elasticsearch.extras.client.node.BulkNodeClient;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
@ -20,14 +31,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkTransportClientTest extends NodeTestBase {
private static final Logger logger = LogManager.getLogger(BulkTransportClientTest.class.getName());
private static final Long MAX_ACTIONS = 1000L;
private static final Long NUM_ACTIONS = 1234L;
@ -43,7 +53,7 @@ public class BulkTransportClientTest extends NodeTestBase {
}
@Test
public void testBulkClientIndexCreation() throws IOException {
public void testBulkTransportClientNewIndex() throws IOException {
logger.info("firing up BulkTransportClient");
final BulkTransportClient client = ClientBuilder.builder()
.put(getClientSettings())
@ -75,7 +85,38 @@ public class BulkTransportClientTest extends NodeTestBase {
}
@Test
public void testSingleDocBulkClient() throws IOException {
public void testBulkTransportClientMapping() throws Exception {
final BulkTransportClient client = ClientBuilder.builder()
.put(getClientSettings())
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5))
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
.toBulkTransportClient();
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject("test")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
client.mapping("test", builder.string());
client.newIndex("test");
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test");
GetMappingsResponse getMappingsResponse =
client.client().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
}
@Test
public void testBulkTransportClientSingleDoc() throws IOException {
logger.info("firing up BulkTransportClient");
final BulkTransportClient client = ClientBuilder.builder()
.put(getClientSettings())
@ -111,7 +152,7 @@ public class BulkTransportClientTest extends NodeTestBase {
}
@Test
public void testRandomDocsBulkClient() {
public void testBulkTransportClientRandomDocs() {
long numactions = NUM_ACTIONS;
final BulkTransportClient client = ClientBuilder.builder()
.put(getClientSettings())
@ -147,7 +188,7 @@ public class BulkTransportClientTest extends NodeTestBase {
}
@Test
public void testThreadedRandomDocsBulkClient() {
public void testBulkTransportClientThreadedRandomDocs() {
int maxthreads = Runtime.getRuntime().availableProcessors();
long maxactions = MAX_ACTIONS;
final long maxloop = NUM_ACTIONS;

View file

@ -1,5 +1,12 @@
package org.xbib.elasticsearch.extras.client.transport;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -10,17 +17,16 @@ import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
/**
*
*/
public class BulkTransportDuplicateIDTest extends NodeTestBase {
private final static Long MAX_ACTIONS = 1000L;
private static final Logger logger = LogManager.getLogger(BulkTransportDuplicateIDTest.class.getName());
private final static Long NUM_ACTIONS = 12345L;
private static final Long MAX_ACTIONS = 1000L;
private static final Long NUM_ACTIONS = 12345L;
@Test
public void testDuplicateDocIDs() throws Exception {

View file

@ -1,6 +1,17 @@
package org.xbib.elasticsearch.extras.client.transport;
import org.elasticsearch.action.admin.indices.stats.*;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -15,15 +26,13 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.Map;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkTransportReplicaTest extends NodeTestBase {
private static final Logger logger = LogManager.getLogger(BulkTransportClientTest.class.getName());
@Test
public void testReplicaLevel() throws Exception {

View file

@ -1,5 +1,8 @@
package org.xbib.elasticsearch.extras.client.transport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -11,9 +14,6 @@ import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
*
*/

View file

@ -0,0 +1,4 @@
/**
* Classes for testing extras for transport client.
*/
package org.xbib.elasticsearch.extras.client.transport;

View file

@ -6,6 +6,9 @@ import org.junit.runners.Suite;
import org.junit.runners.model.InitializationError;
import org.junit.runners.model.RunnerBuilder;
/**
*
*/
public class ListenerSuite extends Suite {
private final TestListener listener = new TestListener();

View file

@ -2,10 +2,10 @@ package suites;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.xbib.elasticsearch.AliasTest;
import org.xbib.elasticsearch.SearchTest;
import org.xbib.elasticsearch.SimpleTest;
import org.xbib.elasticsearch.WildcardTest;
import org.xbib.elasticsearch.extras.client.AliasTest;
import org.xbib.elasticsearch.extras.client.SearchTest;
import org.xbib.elasticsearch.extras.client.SimpleTest;
import org.xbib.elasticsearch.extras.client.WildcardTest;
/**
*

View file

@ -0,0 +1,4 @@
/**
* Test suites.
*/
package suites;

View file

@ -1,9 +1,9 @@
package org.xbib.elasticsearch.extras.client;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;

View file

@ -101,6 +101,9 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
metric.start();
}
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
private final Logger logger = LogManager.getLogger(BulkNodeClient.class.getName() + ".Listener");
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = -1;

View file

@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.Netty4Plugin;
import org.xbib.elasticsearch.extras.client.AbstractClient;
import org.xbib.elasticsearch.extras.client.BulkControl;
@ -54,8 +53,6 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
private static final Logger logger = LogManager.getLogger(BulkTransportClient.class.getName());
private static final Settings DEFAULT_SETTINGS = Settings.builder().put("transport.type.default", "local").build();
private int maxActionsPerRequest = DEFAULT_MAX_ACTIONS_PER_REQUEST;
private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
@ -95,6 +92,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
}
resetSettings();
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
private final Logger logger = LogManager.getLogger(BulkTransportClient.class.getName() + ".Listener");
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = -1L;
@ -170,7 +170,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
builder.setBulkSize(maxVolumePerRequest);
}
this.bulkProcessor = builder.build();
// aut-connect here
// auto-connect here
try {
Collection<InetSocketTransportAddress> addrs = findAddresses(settings);
if (!connect(addrs, settings.getAsBoolean("autodiscover", false))) {

View file

@ -91,15 +91,14 @@ public class TransportClient extends AbstractClient {
private final Object mutex = new Object();
private volatile List<DiscoveryNode> listedNodes = Collections.emptyList();
private volatile List<DiscoveryNode> nodes = Collections.emptyList();
private volatile List<DiscoveryNode> listedNodes = Collections.emptyList();
private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();
private volatile boolean closed;
/**
* Creates a new TransportClient with the given settings and plugins.
* @param settings settings
@ -197,37 +196,44 @@ public class TransportClient extends AbstractClient {
return this;
}
/**
* Adds a list of transport addresses that will be used to connect to.
* The Node this transport address represents will be used if its possible to connect to it.
* If it is unavailable, it will be automatically connected to once it is up.
* In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
*
* @param transportAddresses transport addressses
* @return this transport client
*/
public TransportClient addTransportAddresses(Collection<InetSocketTransportAddress> transportAddresses) {
synchronized (mutex) {
if (closed) {
throw new IllegalStateException("transport client is closed, can't add addresses");
}
List<TransportAddress> filtered = new ArrayList<>(transportAddresses.size());
for (TransportAddress transportAddress : transportAddresses) {
Set<DiscoveryNode> discoveryNodeList = new HashSet<>();
discoveryNodeList.addAll(listedNodes);
logger.debug("before adding: nodes={} listednodes={} transportAddresses={}",
nodes, listedNodes, transportAddresses);
for (TransportAddress newTransportAddress : transportAddresses) {
boolean found = false;
for (DiscoveryNode otherNode : listedNodes) {
if (otherNode.getAddress().equals(transportAddress)) {
for (DiscoveryNode discoveryNode : discoveryNodeList) {
logger.debug("checking existing address [{}] against new [{}]",
discoveryNode.getAddress(), newTransportAddress);
if (discoveryNode.getAddress().sameHost(newTransportAddress)) {
found = true;
logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
logger.debug("address [{}] already connected, ignoring", newTransportAddress, discoveryNode);
break;
}
}
if (!found) {
filtered.add(transportAddress);
}
}
if (filtered.isEmpty()) {
return this;
}
List<DiscoveryNode> discoveryNodeList = new ArrayList<>();
discoveryNodeList.addAll(listedNodes());
for (TransportAddress transportAddress : filtered) {
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeId.incrementAndGet(), transportAddress,
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeId.incrementAndGet(),
newTransportAddress,
Version.CURRENT.minimumCompatibilityVersion());
logger.debug("adding address [{}]", node);
discoveryNodeList.add(node);
}
listedNodes = Collections.unmodifiableList(discoveryNodeList);
}
listedNodes = Collections.unmodifiableList(new ArrayList<>(discoveryNodeList));
connect();
}
return this;
@ -265,13 +271,16 @@ public class TransportClient extends AbstractClient {
return;
}
closed = true;
logger.debug("disconnecting from nodes {}", nodes);
for (DiscoveryNode node : nodes) {
transportService.disconnectFromNode(node);
}
nodes = Collections.emptyList();
logger.debug("disconnecting from listed nodes {}", listedNodes);
for (DiscoveryNode listedNode : listedNodes) {
transportService.disconnectFromNode(listedNode);
}
nodes = Collections.emptyList();
listedNodes = Collections.emptyList();
}
injector.getInstance(TransportService.class).close();
for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).getGuiceServiceClasses()) {
@ -290,7 +299,7 @@ public class TransportClient extends AbstractClient {
for (DiscoveryNode listedNode : listedNodes) {
if (!transportService.nodeConnected(listedNode)) {
try {
logger.trace("connecting to listed node (light) [{}]", listedNode);
logger.debug("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode);
} catch (Exception e) {
logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode);
@ -309,7 +318,7 @@ public class TransportClient extends AbstractClient {
}
}).txGet();
if (!clusterName.equals(livenessResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
logger.warn("node {} not part of the cluster {}, ignoring", listedNode, clusterName);
newFilteredNodes.add(listedNode);
} else if (livenessResponse.getDiscoveryNode() != null) {
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
@ -323,7 +332,7 @@ public class TransportClient extends AbstractClient {
newNodes.add(listedNode);
}
} catch (Exception e) {
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
logger.info("failed to get node info for {}, disconnecting", e, listedNode);
transportService.disconnectFromNode(listedNode);
}
}
@ -331,7 +340,7 @@ public class TransportClient extends AbstractClient {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
try {
logger.trace("connecting to node [{}]", node);
logger.debug("connecting to new node [{}]", node);
transportService.connectToNode(node);
} catch (Exception e) {
it.remove();
@ -340,6 +349,7 @@ public class TransportClient extends AbstractClient {
}
}
this.nodes = Collections.unmodifiableList(new ArrayList<>(newNodes));
logger.debug("connected to {} nodes", nodes.size());
this.filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
@ -392,10 +402,13 @@ public class TransportClient extends AbstractClient {
}
private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
private static ClientTemplate buildTemplate(Settings givenSettings, Settings defaultSettings,
Collection<Class<? extends Plugin>> plugins) {
Settings providedSettings = givenSettings;
if (!Node.NODE_NAME_SETTING.exists(providedSettings)) {
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
providedSettings = Settings.builder().put(providedSettings)
.put(Node.NODE_NAME_SETTING.getKey(), "_client_")
.build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
@ -423,7 +436,7 @@ public class TransportClient extends AbstractClient {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
// plugin modules must be added here, before others or we can get crazy injection errors
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
@ -457,8 +470,7 @@ public class TransportClient extends AbstractClient {
resourcesToClose.addAll(pluginLifecycleComponents);
transportService.start();
transportService.acceptIncomingRequests();
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents,
proxy, namedWriteableRegistry);
ClientTemplate transportClient = new ClientTemplate(injector, proxy);
resourcesToClose.clear();
return transportClient;
} finally {
@ -482,16 +494,12 @@ public class TransportClient extends AbstractClient {
private static final class ClientTemplate {
final Injector injector;
private final List<LifecycleComponent> pluginLifecycleComponents;
private final ProxyActionMap proxy;
private final NamedWriteableRegistry namedWriteableRegistry;
private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
ProxyActionMap proxy, NamedWriteableRegistry namedWriteableRegistry) {
private ClientTemplate(Injector injector,
ProxyActionMap proxy) {
this.injector = injector;
this.pluginLifecycleComponents = pluginLifecycleComponents;
this.proxy = proxy;
this.namedWriteableRegistry = namedWriteableRegistry;
}
Settings getSettings() {