update to Elasticearch 5.0.1

This commit is contained in:
Jörg Prante 2016-11-16 22:35:33 +01:00
parent 409ced9c7a
commit acbfdb8f4c
24 changed files with 488 additions and 390 deletions

View file

@ -99,7 +99,7 @@ You will need Java 8, although Elasticsearch 2.x requires Java 7. Java 7 is not
## Dependencies
This project depends only on https://github.com/xbib/metrics which is a slim version of Coda Hale's metrics library,
and Elasticsearch.
Elasticsearch, and Log4j2 API.
## How to decode the Elasticsearch version

View file

@ -6,7 +6,7 @@ plugins {
}
group = 'org.xbib'
version = '2.2.1.1'
version = '5.0.1.0'
printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" +
"Build: group: ${project.group} name: ${project.name} version: ${project.version}\n",
@ -56,11 +56,10 @@ configurations {
dependencies {
compile "org.xbib:metrics:1.0.0"
compile "org.elasticsearch:elasticsearch:2.2.1"
testCompile "net.java.dev.jna:jna:4.1.0"
compile "org.elasticsearch.client:transport:5.0.1"
compile "org.apache.logging.log4j:log4j-api:2.7"
testCompile "junit:junit:4.12"
testCompile "org.apache.logging.log4j:log4j-core:2.7"
testCompile "org.apache.logging.log4j:log4j-slf4j-impl:2.7"
wagon 'org.apache.maven.wagon:wagon-ssh-external:2.10'
}
@ -69,7 +68,7 @@ tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:all" << "-profile" << "compact3"
}
task integrationTest(type: Test) {
task integrationTest(type: Test, group: 'verification') {
include '**/MiscTestSuite.class'
include '**/BulkNodeTestSuite.class'
include '**/BulkTransportTestSuite.class'
@ -81,7 +80,7 @@ task integrationTest(type: Test) {
classpath += sourceSets.integrationTest.output
outputs.upToDateWhen { false }
systemProperty 'path.home', projectDir.absolutePath
testLogging.showStandardStreams = true
testLogging.showStandardStreams = false
}
integrationTest.mustRunAfter test

View file

@ -1,6 +1,5 @@
package org.elasticsearch.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
@ -21,14 +20,14 @@ public class MockNode extends Node {
super(settings);
}
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins);
}
public MockNode(Settings settings, Class<? extends Plugin> classpathPlugin) {
this(settings, list(classpathPlugin));
}
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins);
}
private static Collection<Class<? extends Plugin>> list(Class<? extends Plugin> classpathPlugin) {
Collection<Class<? extends Plugin>> list = new ArrayList<>();
list.add(classpathPlugin);

View file

@ -4,16 +4,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.junit.Test;
import java.io.IOException;
@ -27,9 +26,9 @@ import java.util.regex.Pattern;
/**
*
*/
public class AliasTest extends NodeTestUtils {
public class AliasTest extends NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger(AliasTest.class.getName());
private static final Logger logger = LogManager.getLogger(AliasTest.class.getName());
@Test
public void testAlias() throws IOException {
@ -37,11 +36,9 @@ public class AliasTest extends NodeTestUtils {
client("1").admin().indices().create(indexRequest).actionGet();
// put alias
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
String[] indices = new String[]{"test"};
String[] aliases = new String[]{"test_alias"};
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases);
indicesAliasesRequest.addAliasAction(aliasAction);
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
.index("test").alias("test_alias")
);
client("1").admin().indices().aliases(indicesAliasesRequest).actionGet();
// get alias
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY);
@ -62,11 +59,10 @@ public class AliasTest extends NodeTestUtils {
indexRequest = new CreateIndexRequest("test20160103");
client("1").admin().indices().create(indexRequest).actionGet();
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
String[] indices = new String[]{"test20160101", "test20160102", "test20160103"};
String[] aliases = new String[]{alias};
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases);
indicesAliasesRequest.addAliasAction(aliasAction);
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
.indices("test20160101", "test20160102", "test20160103")
.alias(alias)
);
client("1").admin().indices().aliases(indicesAliasesRequest).actionGet();
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client("1"),

View file

@ -1,8 +1,7 @@
package org.xbib.elasticsearch;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -10,13 +9,14 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.After;
import org.junit.Before;
import org.xbib.elasticsearch.extras.client.NetworkUtils;
@ -32,13 +32,13 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class NodeTestUtils {
public class NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger("test");
protected static final Logger logger = LogManager.getLogger("test");
private static Random random = new Random();
private static final Random random = new Random();
private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
@ -46,39 +46,20 @@ public class NodeTestUtils {
private AtomicInteger counter = new AtomicInteger();
private String cluster;
private String clustername;
private String host;
private int port;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(System.getProperty("path.home") + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
@Before
public void startNodes() {
try {
logger.info("starting");
logger.info("settings cluster name");
setClusterName();
logger.info("starting nodes");
startNode("1");
findNodeAddress();
try {
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
@ -86,19 +67,18 @@ public class NodeTestUtils {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
}
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
logger.info("nodes are started");
} catch (Throwable t) {
logger.error("startNodes failed", t);
logger.error("start of nodes failed", t);
}
}
@After
public void stopNodes() {
try {
logger.info("stopping nodes");
closeNodes();
} catch (Exception e) {
} catch (Throwable e) {
logger.error("can not close nodes", e);
} finally {
try {
@ -114,37 +94,43 @@ public class NodeTestUtils {
}
protected void setClusterName() {
this.cluster = "test-helper-cluster-"
this.clustername = "test-helper-cluster-"
+ NetworkUtils.getLocalAddress().getHostName()
+ "-" + System.getProperty("user.name")
+ "-" + counter.incrementAndGet();
}
protected String getClusterName() {
return cluster;
}
protected Settings getSettings() {
return settingsBuilder()
.put("host", host)
.put("port", port)
.put("cluster.name", cluster)
.put("path.home", getHome())
.build();
return clustername;
}
protected Settings getNodeSettings() {
return settingsBuilder()
.put("cluster.name", cluster)
.put("cluster.routing.schedule", "50ms")
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", true)
.put("discovery.zen.multicast.ping_timeout", "5s")
.put("http.enabled", true)
.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
.put("index.number_of_replicas", 0)
String hostname = NetworkUtils.getLocalAddress().getHostName();
return Settings.builder()
.put("cluster.name", clustername)
// required to build a cluster, replica tests will test this.
.put("discovery.zen.ping.unicast.hosts", hostname)
.put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME)
.put("network.host", hostname)
.put("http.enabled", false)
.put("path.home", getHome())
// maximum five nodes on same host
.put("node.max_local_storage_nodes", 5)
.put("thread_pool.bulk.size", Runtime.getRuntime().availableProcessors())
// default is 50 which is too low
.put("thread_pool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors())
.build();
}
protected Settings getClientSettings() {
if (host == null) {
throw new IllegalStateException("host is null");
}
// the host to which transport client should connect to
return Settings.builder()
.put("cluster.name", clustername)
.put("host", host + ":" + port)
.build();
}
@ -153,7 +139,11 @@ public class NodeTestUtils {
}
public void startNode(String id) throws IOException {
try {
buildNode(id).start();
} catch (NodeValidationException e) {
throw new IOException(e);
}
}
public AbstractClient client(String id) {
@ -179,22 +169,30 @@ public class NodeTestUtils {
protected void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
Object obj = response.getNodes().iterator().next().getTransport().getAddress()
.publishAddress();
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
port = address.address().getPort();
} else if (obj instanceof LocalTransportAddress) {
LocalTransportAddress address = (LocalTransportAddress) obj;
host = address.getHost();
port = address.getPort();
} else {
logger.info("class=" + obj.getClass());
}
if (host == null) {
throw new IllegalArgumentException("host not found");
}
}
private Node buildNode(String id) throws IOException {
Settings nodeSettings = settingsBuilder()
Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
.put("name", id)
.build();
logger.info("settings={}", nodeSettings.getAsMap());
Node node = new MockNode(nodeSettings);
Node node = new MockNode(nodeSettings, Netty4Plugin.class);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
@ -210,4 +208,22 @@ public class NodeTestUtils {
}
return new String(buf);
}
private static void deleteFiles() throws IOException {
Path directory = Paths.get(System.getProperty("path.home") + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
}

View file

@ -4,13 +4,13 @@ import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.client.Requests.refreshRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortOrder;
@ -19,9 +19,9 @@ import org.junit.Test;
/**
*
*/
public class SearchTest extends NodeTestUtils {
public class SearchTest extends NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger("test");
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
@Test
public void testSearch() throws Exception {
@ -43,7 +43,8 @@ public class SearchTest extends NodeTestUtils {
.field("user8", "kimchy")
.field("user9", "kimchy")
.field("rowcount", i)
.field("rs", 1234)));
.field("rs", 1234)
.endObject()));
}
client.bulk(builder.request()).actionGet();

View file

@ -1,28 +1,32 @@
package org.xbib.elasticsearch;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.junit.Assert.assertEquals;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
/**
*
*/
public class SimpleTest extends NodeTestUtils {
public class SimpleTest extends NodeTestBase {
protected Settings getNodeSettings() {
return settingsBuilder()
.put("path.home", System.getProperty("path.home"))
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
return Settings.builder()
.put("cluster.name", getClusterName())
.put("discovery.type", "local")
.put("transport.type", "local")
.put("http.enabled", false)
.put("path.home", getHome())
.put("node.max_local_storage_nodes", 5)
.build();
}
@ -35,6 +39,15 @@ public class SimpleTest extends NodeTestUtils {
} catch (Exception e) {
// ignore
}
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client("1"), CreateIndexAction.INSTANCE)
.setIndex("test")
.setSettings(Settings.builder()
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
.build());
createIndexRequestBuilder.execute().actionGet();
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(client("1"), IndexAction.INSTANCE);
indexRequestBuilder
.setIndex("test")
@ -42,7 +55,7 @@ public class SimpleTest extends NodeTestUtils {
.setId("1")
.setSource(jsonBuilder().startObject().field("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject())
.setRefresh(true)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute()
.actionGet();
String doc = client("1").prepareSearch("test")

View file

@ -1,10 +1,10 @@
package org.xbib.elasticsearch;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
@ -15,17 +15,16 @@ import java.io.IOException;
/**
*
*/
public class WildcardTest extends NodeTestUtils {
public class WildcardTest extends NodeTestBase {
protected Settings getNodeSettings() {
return settingsBuilder()
return Settings.builder()
.put("cluster.name", getClusterName())
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", false)
.put("discovery.type", "local")
.put("transport.type", "local")
.put("http.enabled", false)
.put("path.home", System.getProperty("path.home"))
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("path.home", getHome())
.put("node.max_local_storage_nodes", 5)
.build();
}
@ -51,7 +50,8 @@ public class WildcardTest extends NodeTestUtils {
client.index(indexRequest()
.index("index").type("type").id(id)
.source(jsonBuilder().startObject().field("field", fieldValue).endObject())
.refresh(true)).actionGet();
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE))
.actionGet();
}
private long count(Client client, QueryBuilder queryBuilder) {

View file

@ -4,36 +4,39 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.ExecutorServices;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class BulkNodeClientTest extends NodeTestUtils {
public class BulkNodeClientTest extends NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeClientTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(BulkNodeClientTest.class.getName());
private static final Long MAX_ACTIONS = 1000L;
@ -165,18 +168,15 @@ public class BulkNodeClientTest extends NodeTestUtils {
.toBulkNodeClient(client("1"));
try {
client.newIndex("test")
.startBulk("test", -1, 1000);
ThreadPoolExecutor pool = EsExecutors.newFixed("bulk-nodeclient-test", maxthreads, 30,
EsExecutors.daemonThreadFactory("bulk-nodeclient-test"));
.startBulk("test", 30 * 1000, 1000);
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
final CountDownLatch latch = new CountDownLatch(maxthreads);
for (int i = 0; i < maxthreads; i++) {
pool.execute(new Runnable() {
public void run() {
for (int i = 0; i < maxloop; i++) {
executorService.execute(() -> {
for (int i1 = 0; i1 < maxloop; i1++) {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
}
latch.countDown();
}
});
}
logger.info("waiting for max 30 seconds...");
@ -184,8 +184,8 @@ public class BulkNodeClientTest extends NodeTestUtils {
logger.info("flush...");
client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30));
logger.info("got all responses, thread pool shutdown...");
pool.shutdown();
logger.info("got all responses, executor service shutdown...");
executorService.shutdown();
logger.info("pool is shut down");
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");

View file

@ -1,24 +1,24 @@
package org.xbib.elasticsearch.extras.client.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.Before;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
*
*/
public class BulkNodeClusterBlockTest extends NodeTestUtils {
public class BulkNodeClusterBlockTest extends NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger("test");
private static final Logger logger = LogManager.getLogger(BulkNodeClusterBlockTest.class.getName());
@Before
public void startNodes() {
@ -34,7 +34,7 @@ public class BulkNodeClusterBlockTest extends NodeTestUtils {
}
protected Settings getNodeSettings() {
return Settings.settingsBuilder()
return Settings.builder()
.put(super.getNodeSettings())
.put("discovery.zen.minimum_master_nodes", 2) // block until we have two nodes
.build();

View file

@ -1,13 +1,13 @@
package org.xbib.elasticsearch.extras.client.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -18,9 +18,9 @@ import static org.junit.Assert.*;
/**
*
*/
public class BulkNodeDuplicateIDTest extends NodeTestUtils {
public class BulkNodeDuplicateIDTest extends NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeDuplicateIDTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(BulkNodeDuplicateIDTest.class.getName());
private static final Long MAX_ACTIONS = 1000L;

View file

@ -1,13 +1,13 @@
package org.xbib.elasticsearch.extras.client.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.IndexAliasAdder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
@ -22,9 +22,9 @@ import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkNodeIndexAliasTest extends NodeTestUtils {
public class BulkNodeIndexAliasTest extends NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeIndexAliasTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(BulkNodeIndexAliasTest.class.getName());
@Test
public void testIndexAlias() throws Exception {

View file

@ -1,16 +1,21 @@
package org.xbib.elasticsearch.extras.client.node;
import org.elasticsearch.action.admin.indices.stats.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -21,9 +26,12 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class BulkNodeReplicaTest extends NodeTestUtils {
/**
*
*/
public class BulkNodeReplicaTest extends NodeTestBase {
private final static ESLogger logger = ESLoggerFactory.getLogger(BulkNodeReplicaTest.class.getSimpleName());
private final static Logger logger = LogManager.getLogger(BulkNodeReplicaTest.class.getName());
@Test
public void testReplicaLevel() throws Exception {
@ -33,12 +41,12 @@ public class BulkNodeReplicaTest extends NodeTestUtils {
startNode("3");
startNode("4");
Settings settingsTest1 = Settings.settingsBuilder()
Settings settingsTest1 = Settings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 3)
.build();
Settings settingsTest2 = Settings.settingsBuilder()
Settings settingsTest2 = Settings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build();

View file

@ -1,12 +1,12 @@
package org.xbib.elasticsearch.extras.client.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -17,9 +17,9 @@ import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils {
public class BulkNodeUpdateReplicaLevelTest extends NodeTestBase {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeUpdateReplicaLevelTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(BulkNodeUpdateReplicaLevelTest.class.getName());
@Test
public void testUpdateReplicaLevel() throws Exception {
@ -33,7 +33,7 @@ public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils {
int shardsAfterReplica;
Settings settings = Settings.settingsBuilder()
Settings settings = Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.build();

View file

@ -3,15 +3,12 @@ package org.xbib.elasticsearch.extras.client.transport;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -19,7 +16,8 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -28,9 +26,7 @@ import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkTransportClientTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportClientTest.class.getSimpleName());
public class BulkTransportClientTest extends NodeTestBase {
private static final Long MAX_ACTIONS = 1000L;
@ -47,22 +43,26 @@ public class BulkTransportClientTest extends NodeTestUtils {
}
@Test
public void testBulkClient() throws IOException {
public void testBulkClientIndexCreation() throws IOException {
logger.info("firing up BulkTransportClient");
final BulkTransportClient client = ClientBuilder.builder()
.put(getSettings())
.put(getClientSettings())
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
.toBulkTransportClient();
logger.info("creating index");
client.newIndex("test");
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
try {
logger.info("deleting/creating index sequence start");
client.deleteIndex("test")
.newIndex("test")
.deleteIndex("test");
logger.info("deleting/creating index sequence end");
} catch (NoNodeAvailableException e) {
logger.error("no node available");
} finally {
@ -76,18 +76,24 @@ public class BulkTransportClientTest extends NodeTestUtils {
@Test
public void testSingleDocBulkClient() throws IOException {
logger.info("firing up BulkTransportClient");
final BulkTransportClient client = ClientBuilder.builder()
.put(getSettings())
.put(getClientSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
.toBulkTransportClient();
try {
logger.info("creating index");
client.newIndex("test");
logger.info("indexing one doc");
client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest
logger.info("flush");
client.flushIngest();
logger.info("wait for responses");
client.waitForResponses(TimeValue.timeValueSeconds(30));
logger.info("waited for responses");
} catch (InterruptedException e) {
// ignore
} catch (ExecutionException e) {
@ -105,10 +111,10 @@ public class BulkTransportClientTest extends NodeTestUtils {
}
@Test
public void testRandomDocsBulkClient() throws IOException {
public void testRandomDocsBulkClient() {
long numactions = NUM_ACTIONS;
final BulkTransportClient client = ClientBuilder.builder()
.put(getSettings())
.put(getClientSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric())
@ -127,7 +133,10 @@ public class BulkTransportClientTest extends NodeTestUtils {
logger.error(e.getMessage(), e);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} catch (Throwable t) {
logger.error("unexcepted: " + t.getMessage(), t);
} finally {
logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount());
assertEquals(numactions, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
@ -138,34 +147,36 @@ public class BulkTransportClientTest extends NodeTestUtils {
}
@Test
public void testThreadedRandomDocsBulkClient() throws Exception {
public void testThreadedRandomDocsBulkClient() {
int maxthreads = Runtime.getRuntime().availableProcessors();
long maxactions = MAX_ACTIONS;
final long maxloop = NUM_ACTIONS;
Settings settingsForIndex = Settings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build();
logger.info("firing up client");
final BulkTransportClient client = ClientBuilder.builder()
.put(getSettings())
.put(getClientSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) // = disable autoflush for this test
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
.toBulkTransportClient();
try {
logger.info("new index");
Settings settingsForIndex = Settings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build();
client.newIndex("test", settingsForIndex, null)
.startBulk("test", -1, 1000);
ThreadPoolExecutor pool =
EsExecutors.newFixed("bulkclient-test", maxthreads, 30, EsExecutors.daemonThreadFactory("bulkclient-test"));
logger.info("pool");
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
final CountDownLatch latch = new CountDownLatch(maxthreads);
for (int i = 0; i < maxthreads; i++) {
pool.execute(() -> {
executorService.execute(() -> {
logger.info("executing runnable");
for (int i1 = 0; i1 < maxloop; i1++) {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
}
logger.info("done runnable");
latch.countDown();
});
}
@ -174,13 +185,16 @@ public class BulkTransportClientTest extends NodeTestUtils {
logger.info("client flush ...");
client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30));
logger.info("thread pool to be shut down ...");
pool.shutdown();
logger.info("poot shut down");
logger.info("executor service to be shut down ...");
executorService.shutdown();
logger.info("executor service is shut down");
client.stopBulk("test");
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} catch (Throwable t) {
logger.error("unexpected error: " + t.getMessage(), t);
} finally {
client.stopBulk("test");
logger.info("assuring {} == {}", maxthreads * maxloop, client.getMetric().getSucceeded().getCount());
assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
@ -188,8 +202,7 @@ public class BulkTransportClientTest extends NodeTestUtils {
assertFalse(client.hasThrowable());
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE)
// to avoid NPE at org.elasticsearch.action.search.SearchRequest.writeTo(SearchRequest.java:580)
.setIndices("_all")
.setIndices("test")
.setQuery(QueryBuilders.matchAllQuery())
.setSize(0);
assertEquals(maxthreads * maxloop,
@ -197,5 +210,4 @@ public class BulkTransportClientTest extends NodeTestUtils {
client.shutdown();
}
}
}

View file

@ -3,11 +3,9 @@ package org.xbib.elasticsearch.extras.client.transport;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -15,9 +13,10 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
public class BulkTransportDuplicateIDTest extends NodeTestUtils {
private final static ESLogger logger = ESLoggerFactory.getLogger(BulkTransportDuplicateIDTest.class.getSimpleName());
/**
*
*/
public class BulkTransportDuplicateIDTest extends NodeTestBase {
private final static Long MAX_ACTIONS = 1000L;
@ -27,7 +26,7 @@ public class BulkTransportDuplicateIDTest extends NodeTestUtils {
public void testDuplicateDocIDs() throws Exception {
long numactions = NUM_ACTIONS;
final BulkTransportClient client = ClientBuilder.builder()
.put(getSettings())
.put(getClientSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())

View file

@ -4,13 +4,11 @@ import org.elasticsearch.action.admin.indices.stats.*;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -24,9 +22,7 @@ import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkTransportReplicaTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportReplicaTest.class.getSimpleName());
public class BulkTransportReplicaTest extends NodeTestBase {
@Test
public void testReplicaLevel() throws Exception {
@ -36,18 +32,18 @@ public class BulkTransportReplicaTest extends NodeTestUtils {
startNode("3");
startNode("4");
Settings settingsTest1 = Settings.settingsBuilder()
Settings settingsTest1 = Settings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 3)
.build();
Settings settingsTest2 = Settings.settingsBuilder()
Settings settingsTest2 = Settings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build();
final BulkTransportClient client = ClientBuilder.builder()
.put(getSettings())
.put(getClientSettings())
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
.toBulkTransportClient();

View file

@ -1,12 +1,12 @@
package org.xbib.elasticsearch.extras.client.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -17,10 +17,9 @@ import static org.junit.Assert.assertFalse;
/**
*
*/
public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils {
public class BulkTransportUpdateReplicaLevelTest extends NodeTestBase {
private static final ESLogger logger =
ESLoggerFactory.getLogger(BulkTransportUpdateReplicaLevelTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(BulkTransportUpdateReplicaLevelTest.class.getName());
@Test
public void testUpdateReplicaLevel() throws Exception {
@ -34,13 +33,13 @@ public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils {
int shardsAfterReplica;
Settings settings = Settings.settingsBuilder()
Settings settings = Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.build();
final BulkTransportClient client = ClientBuilder.builder()
.put(getSettings())
.put(getClientSettings())
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
.toBulkTransportClient();

View file

@ -1,5 +1,7 @@
package org.xbib.elasticsearch.extras.client;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchTimeoutException;
@ -39,8 +41,6 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
@ -68,7 +68,7 @@ import java.util.regex.Pattern;
*/
public abstract class AbstractClient {
private static final ESLogger logger = ESLoggerFactory.getLogger(AbstractClient.class.getName());
private static final Logger logger = LogManager.getLogger(AbstractClient.class.getName());
private Settings.Builder settingsBuilder;
@ -87,7 +87,7 @@ public abstract class AbstractClient {
}
public void resetSettings() {
this.settingsBuilder = Settings.settingsBuilder();
this.settingsBuilder = Settings.builder();
settings = null;
mappings = new HashMap<>();
}
@ -98,31 +98,31 @@ public abstract class AbstractClient {
public void setting(String key, String value) {
if (settingsBuilder == null) {
settingsBuilder = Settings.settingsBuilder();
settingsBuilder = Settings.builder();
}
settingsBuilder.put(key, value);
}
public void setting(String key, Boolean value) {
if (settingsBuilder == null) {
settingsBuilder = Settings.settingsBuilder();
settingsBuilder = Settings.builder();
}
settingsBuilder.put(key, value);
}
public void setting(String key, Integer value) {
if (settingsBuilder == null) {
settingsBuilder = Settings.settingsBuilder();
settingsBuilder = Settings.builder();
}
settingsBuilder.put(key, value);
}
public void setting(InputStream in) throws IOException {
settingsBuilder = Settings.settingsBuilder().loadFromStream(".json", in);
settingsBuilder = Settings.builder().loadFromStream(".json", in);
}
public Settings.Builder settingsBuilder() {
return settingsBuilder != null ? settingsBuilder : Settings.settingsBuilder();
return settingsBuilder != null ? settingsBuilder : Settings.builder();
}
public Settings settings() {
@ -130,7 +130,7 @@ public abstract class AbstractClient {
return settings;
}
if (settingsBuilder == null) {
settingsBuilder = Settings.settingsBuilder();
settingsBuilder = Settings.builder();
}
return settingsBuilder.build();
}
@ -166,7 +166,7 @@ public abstract class AbstractClient {
if (value == null) {
throw new IOException("no value given");
}
Settings.Builder updateSettingsBuilder = Settings.settingsBuilder();
Settings.Builder updateSettingsBuilder = Settings.builder();
updateSettingsBuilder.put(key, value.toString());
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
.settings(updateSettingsBuilder);
@ -218,7 +218,7 @@ public abstract class AbstractClient {
new ClusterStateRequestBuilder(client(), ClusterStateAction.INSTANCE).all();
ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
String name = clusterStateResponse.getClusterName().value();
int nodeCount = clusterStateResponse.getState().getNodes().size();
int nodeCount = clusterStateResponse.getState().getNodes().getSize();
return name + " (" + nodeCount + " nodes connected)";
} catch (ElasticsearchTimeoutException e) {
logger.warn(e.getMessage(), e);
@ -476,9 +476,9 @@ public abstract class AbstractClient {
return null;
}
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client(), SearchAction.INSTANCE);
SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SortBuilder<?> sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SearchResponse searchResponse = searchRequestBuilder.setIndices(index)
.addField(timestampfieldname)
.addStoredField(timestampfieldname)
.setSize(1)
.addSort(sort)
.execute().actionGet();

View file

@ -9,7 +9,6 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -51,7 +50,7 @@ public class BulkProcessor implements Closeable {
private BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytes();
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = concurrentRequests == 0 ?
@ -176,18 +175,6 @@ public class BulkProcessor implements Closeable {
executeIfNeeded();
}
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType)
throws Exception {
return add(data, defaultIndex, defaultType, null);
}
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex,
@Nullable String defaultType, @Nullable Object payload) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, payload, true);
executeIfNeeded();
return this;
}
private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
@ -441,7 +428,7 @@ public class BulkProcessor implements Closeable {
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {

View file

@ -1,7 +1,7 @@
package org.xbib.elasticsearch.extras.client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.Inet4Address;
@ -20,7 +20,7 @@ import java.util.Locale;
*/
public class NetworkUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(NetworkUtils.class.getName());
private static final Logger logger = LogManager.getLogger(NetworkUtils.class.getName());
private static final String IPV4_SETTING = "java.net.preferIPv4Stack";

View file

@ -1,8 +1,8 @@
package org.xbib.elasticsearch.extras.client.node;
import com.google.common.collect.ImmutableSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
@ -18,13 +18,12 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.xbib.elasticsearch.extras.client.AbstractClient;
import org.xbib.elasticsearch.extras.client.BulkControl;
@ -44,7 +43,7 @@ import java.util.concurrent.TimeUnit;
*/
public class BulkNodeClient extends AbstractClient implements ClientMethods {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeClient.class.getName());
private static final Logger logger = LogManager.getLogger(BulkNodeClient.class.getName());
private int maxActionsPerRequest = DEFAULT_MAX_ACTIONS_PER_REQUEST;
@ -216,7 +215,11 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
version, effectiveSettings.getAsMap());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
try {
node.start();
} catch (NodeValidationException e) {
throw new IOException(e);
}
this.client = node.client();
}
}
@ -389,7 +392,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
}
if (control != null && control.indices() != null && !control.indices().isEmpty()) {
logger.debug("stopping bulk mode for indices {}...", control.indices());
for (String index : ImmutableSet.copyOf(control.indices())) {
for (String index : control.indices()) {
stopBulk(index);
}
metric.stop();
@ -505,7 +508,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
private class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
super(env, Version.CURRENT, classpathPlugins);
super(env, classpathPlugins);
}
}

View file

@ -1,6 +1,7 @@
package org.xbib.elasticsearch.extras.client.transport;
import com.google.common.collect.ImmutableSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
@ -21,12 +22,13 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.Netty4Plugin;
import org.xbib.elasticsearch.extras.client.AbstractClient;
import org.xbib.elasticsearch.extras.client.BulkControl;
import org.xbib.elasticsearch.extras.client.BulkMetric;
@ -39,6 +41,7 @@ import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -49,7 +52,9 @@ import java.util.concurrent.TimeUnit;
*/
public class BulkTransportClient extends AbstractClient implements ClientMethods {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportClient.class.getName());
private static final Logger logger = LogManager.getLogger(BulkTransportClient.class.getName());
private static final Settings DEFAULT_SETTINGS = Settings.builder().put("transport.type.default", "local").build();
private int maxActionsPerRequest = DEFAULT_MAX_ACTIONS_PER_REQUEST;
@ -165,6 +170,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
builder.setBulkSize(maxVolumePerRequest);
}
this.bulkProcessor = builder.build();
// aut-connect here
try {
Collection<InetSocketTransportAddress> addrs = findAddresses(settings);
if (!connect(addrs, settings.getAsBoolean("autodiscover", false))) {
@ -205,9 +211,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
+ " " + System.getProperty("java.vm.version");
logger.info("creating transport client on {} with effective settings {}",
version, settings.getAsMap());
this.client = TransportClient.builder()
.settings(settings)
.build();
this.client = new TransportClient(Settings.builder()
.put("cluster.name", settings.get("cluster.name"))
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
.build(), Collections.singletonList(Netty4Plugin.class));
this.ignoreBulkErrors = settings.getAsBoolean("ignoreBulkErrors", true);
}
}
@ -313,7 +320,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
if (control == null) {
return this;
}
if (!control.isBulk(index)) {
if (!control.isBulk(index) && startRefreshIntervalSeconds > 0L && stopRefreshIntervalSeconds > 0L) {
control.startBulk(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
updateIndexSetting(index, "refresh_interval", startRefreshIntervalSeconds + "s");
}
@ -326,7 +333,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
return this;
}
if (control.isBulk(index)) {
updateIndexSetting(index, "refresh_interval", control.getStopBulkRefreshIntervals().get(index) + "s");
long secs = control.getStopBulkRefreshIntervals().get(index);
if (secs > 0L) {
updateIndexSetting(index, "refresh_interval", secs + "s");
}
control.finishBulk(index);
}
return this;
@ -461,7 +471,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
}
if (control != null && control.indices() != null && !control.indices().isEmpty()) {
logger.debug("stopping bulk mode for indices {}...", control.indices());
for (String index : ImmutableSet.copyOf(control.indices())) {
for (String index : control.indices()) {
stopBulk(index);
}
metric.stop();
@ -485,7 +495,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
}
private Settings findSettings() {
Settings.Builder settingsBuilder = Settings.settingsBuilder();
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put("host", "localhost");
try {
String hostname = NetworkUtils.getLocalAddress().getHostName();

View file

@ -1,9 +1,10 @@
package org.xbib.elasticsearch.extras.client.transport;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -16,39 +17,41 @@ import org.elasticsearch.action.TransportActionNodeProxy;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessRequest;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.client.transport.ClientTransportModule;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -59,6 +62,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* Stripped-down transport client without node sampling and without retrying.
@ -73,17 +77,13 @@ public class TransportClient extends AbstractClient {
private final Injector injector;
private final ProxyActionMap proxyActionMap;
private final long pingTimeout;
private final ClusterName clusterName;
private final TransportService transportService;
private final Version minCompatibilityVersion;
private final Headers headers;
private final ProxyActionMap proxy;
private final AtomicInteger tempNodeId = new AtomicInteger();
@ -99,20 +99,41 @@ public class TransportClient extends AbstractClient {
private volatile boolean closed;
private TransportClient(Injector injector) {
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class),
injector.getInstance(Headers.class));
this.injector = injector;
this.clusterName = injector.getInstance(ClusterName.class);
this.transportService = injector.getInstance(TransportService.class);
this.minCompatibilityVersion = injector.getInstance(Version.class).minimumCompatibilityVersion();
this.headers = injector.getInstance(Headers.class);
this.pingTimeout = this.settings.getAsTime("client.transport.ping_timeout", timeValueSeconds(5)).millis();
this.proxyActionMap = injector.getInstance(ProxyActionMap.class);
/**
* Creates a new TransportClient with the given settings and plugins.
* @param settings settings
*/
public TransportClient(Settings settings) {
this(buildTemplate(settings, Settings.EMPTY, Collections.emptyList()));
}
public static Builder builder() {
return new Builder();
/**
* Creates a new TransportClient with the given settings and plugins.
* @param settings settings
* @param plugins plugins
*/
public TransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
this(buildTemplate(settings, Settings.EMPTY, plugins));
}
/**
* Creates a new TransportClient with the given settings, defaults and plugins.
* @param settings the client settings
* @param defaultSettings default settings that are merged after the plugins have added it's additional settings.
* @param plugins the client plugins
*/
protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins) {
this(buildTemplate(settings, defaultSettings, plugins));
}
private TransportClient(ClientTemplate template) {
super(template.getSettings(), template.getThreadPool());
this.injector = template.injector;
this.clusterName = new ClusterName(template.getSettings().get("cluster.name", "elasticsearch"));
this.transportService = injector.getInstance(TransportService.class);
this.pingTimeout = this.settings.getAsTime("client.transport.ping_timeout", timeValueSeconds(5)).millis();
this.proxy = template.proxy;
}
/**
@ -123,7 +144,7 @@ public class TransportClient extends AbstractClient {
public List<TransportAddress> transportAddresses() {
List<TransportAddress> lstBuilder = new ArrayList<>();
for (DiscoveryNode listedNode : listedNodes) {
lstBuilder.add(listedNode.address());
lstBuilder.add(listedNode.getAddress());
}
return Collections.unmodifiableList(lstBuilder);
}
@ -170,7 +191,7 @@ public class TransportClient extends AbstractClient {
public TransportClient addDiscoveryNodes(DiscoveryNodes discoveryNodes) {
Collection<InetSocketTransportAddress> addresses = new ArrayList<>();
for (DiscoveryNode discoveryNode : discoveryNodes) {
addresses.add((InetSocketTransportAddress) discoveryNode.address());
addresses.add((InetSocketTransportAddress) discoveryNode.getAddress());
}
addTransportAddresses(addresses);
return this;
@ -185,7 +206,7 @@ public class TransportClient extends AbstractClient {
for (TransportAddress transportAddress : transportAddresses) {
boolean found = false;
for (DiscoveryNode otherNode : listedNodes) {
if (otherNode.address().equals(transportAddress)) {
if (otherNode.getAddress().equals(transportAddress)) {
found = true;
logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
break;
@ -202,7 +223,7 @@ public class TransportClient extends AbstractClient {
discoveryNodeList.addAll(listedNodes());
for (TransportAddress transportAddress : filtered) {
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeId.incrementAndGet(), transportAddress,
minCompatibilityVersion);
Version.CURRENT.minimumCompatibilityVersion());
logger.debug("adding address [{}]", node);
discoveryNodeList.add(node);
}
@ -225,7 +246,7 @@ public class TransportClient extends AbstractClient {
}
List<DiscoveryNode> builder = new ArrayList<>();
for (DiscoveryNode otherNode : listedNodes) {
if (!otherNode.address().equals(transportAddress)) {
if (!otherNode.getAddress().equals(transportAddress)) {
builder.add(otherNode);
} else {
logger.debug("removing address [{}]", otherNode);
@ -253,7 +274,7 @@ public class TransportClient extends AbstractClient {
nodes = Collections.emptyList();
}
injector.getInstance(TransportService.class).close();
for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).nodeServices()) {
for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).getGuiceServiceClasses()) {
injector.getInstance(plugin).close();
}
try {
@ -261,7 +282,6 @@ public class TransportClient extends AbstractClient {
} catch (Exception e) {
logger.debug(e.getMessage(), e);
}
injector.getInstance(PageCacheRecycler.class).close();
}
private void connect() {
@ -279,7 +299,7 @@ public class TransportClient extends AbstractClient {
}
try {
LivenessResponse livenessResponse = transportService.submitRequest(listedNode,
TransportLivenessAction.NAME, headers.applyTo(new LivenessRequest()),
TransportLivenessAction.NAME, new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new FutureTransportResponseHandler<LivenessResponse>() {
@ -293,9 +313,10 @@ public class TransportClient extends AbstractClient {
newFilteredNodes.add(listedNode);
} else if (livenessResponse.getDiscoveryNode() != null) {
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo.getHostName(),
nodeWithInfo.getHostAddress(), listedNode.address(), nodeWithInfo.attributes(),
nodeWithInfo.version()));
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(),
nodeWithInfo.getEphemeralId(), nodeWithInfo.getHostName(),
nodeWithInfo.getHostAddress(), listedNode.getAddress(), nodeWithInfo.getAttributes(),
nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
} else {
logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node",
listedNode);
@ -324,9 +345,9 @@ public class TransportClient extends AbstractClient {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
protected <R extends ActionRequest, S extends ActionResponse, T extends ActionRequestBuilder<R, S, T>>
protected <R extends ActionRequest<R>, S extends ActionResponse, T extends ActionRequestBuilder<R, S, T>>
void doExecute(Action<R, S, T> action, final R request, final ActionListener<S> listener) {
final TransportActionNodeProxy<R, S> proxyAction = proxyActionMap.getProxies().get(action);
final TransportActionNodeProxy<R, S> proxyAction = proxy.getProxies().get(action);
if (proxyAction == null) {
throw new IllegalStateException("undefined action " + action);
}
@ -347,89 +368,17 @@ public class TransportClient extends AbstractClient {
}
}
/**
*
*/
public static class Builder {
private Settings settings = Settings.EMPTY;
private List<Class<? extends Plugin>> pluginClasses = new ArrayList<>();
public Builder settings(Settings.Builder settings) {
return settings(settings.build());
}
public Builder settings(Settings settings) {
this.settings = settings;
return this;
}
public Builder addPlugin(Class<? extends Plugin> pluginClass) {
pluginClasses.add(pluginClass);
return this;
}
public TransportClient build() {
Settings transportClientSettings = settingsBuilder()
.put("transport.ping.schedule", this.settings.get("ping.interval", "30s"))
.put(InternalSettingsPreparer.prepareSettings(this.settings))
.put("network.server", false)
.put("node.client", true)
.put(CLIENT_TYPE_SETTING, CLIENT_TYPE)
.build();
PluginsService pluginsService = new PluginsService(transportClientSettings, null, null, pluginClasses);
this.settings = pluginsService.updatedSettings();
Version version = Version.CURRENT;
final ThreadPool threadPool = new ThreadPool(transportClientSettings);
boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
modules.add(pluginModule);
}
modules.add(new PluginsModule(pluginsService));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule());
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new TransportModule(this.settings));
modules.add(new SearchModule() {
@Override
protected void configure() {
// noop
}
});
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));
pluginsService.processModules(modules);
Injector injector = modules.createInjector();
injector.getInstance(TransportService.class).start();
TransportClient transportClient = new TransportClient(injector);
success = true;
return transportClient;
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
}
/**
* The {@link ProxyActionMap} must be declared public.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static class ProxyActionMap {
private static class ProxyActionMap {
private final ImmutableMap<Action, TransportActionNodeProxy> proxies;
private final Map<Action, TransportActionNodeProxy> proxies;
@Inject
public ProxyActionMap(Settings settings, TransportService transportService, Map<String, GenericAction> actions) {
public ProxyActionMap(Settings settings, TransportService transportService, List<GenericAction> actions) {
MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
for (GenericAction action : actions.values()) {
for (GenericAction action : actions) {
if (action instanceof Action) {
actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
@ -437,9 +386,120 @@ public class TransportClient extends AbstractClient {
this.proxies = actionsBuilder.immutableMap();
}
public ImmutableMap<Action, TransportActionNodeProxy> getProxies() {
Map<Action, TransportActionNodeProxy> getProxies() {
return proxies;
}
}
private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
Collection<Class<? extends Plugin>> plugins) {
if (!Node.NODE_NAME_SETTING.exists(providedSettings)) {
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
try {
final List<Setting<?>> additionalSettings = new ArrayList<>();
final List<String> additionalSettingsFilter = new ArrayList<>();
additionalSettings.addAll(pluginsService.getPluginSettings());
additionalSettingsFilter.addAll(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
NetworkModule networkModule = new NetworkModule(networkService, settings, true);
SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(networkModule.getNamedWriteables());
entries.addAll(searchModule.getNamedWriteables());
entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream())
.collect(Collectors.toList()));
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
modules.add(networkModule);
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class));
modules.add(actionModule);
pluginsService.processModules(modules);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
}));
Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
final ProxyActionMap proxy = new ProxyActionMap(settings, transportService,
actionModule.getActions().values().stream()
.map(ActionPlugin.ActionHandler::getAction).collect(Collectors.toList()));
List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>();
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
transportService.start();
transportService.acceptIncomingRequests();
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents,
proxy, namedWriteableRegistry);
resourcesToClose.clear();
return transportClient;
} finally {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
private static final Logger logger = LogManager.getLogger(TransportClient.class);
private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
.put(NetworkService.NETWORK_SERVER.getKey(), false)
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);
if (!settings.isEmpty()) {
logger.info(settings.getAsMap());
settingsBuilder.put(InternalSettingsPreparer.prepareSettings(settings));
}
return new PluginsService(settingsBuilder.build(), null, null, plugins);
}
private static final class ClientTemplate {
final Injector injector;
private final List<LifecycleComponent> pluginLifecycleComponents;
private final ProxyActionMap proxy;
private final NamedWriteableRegistry namedWriteableRegistry;
private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
ProxyActionMap proxy, NamedWriteableRegistry namedWriteableRegistry) {
this.injector = injector;
this.pluginLifecycleComponents = pluginLifecycleComponents;
this.proxy = proxy;
this.namedWriteableRegistry = namedWriteableRegistry;
}
Settings getSettings() {
return injector.getInstance(Settings.class);
}
ThreadPool getThreadPool() {
return injector.getInstance(ThreadPool.class);
}
}
}