diff --git a/README.adoc b/README.adoc index 852224a..aa64451 100644 --- a/README.adoc +++ b/README.adoc @@ -99,7 +99,7 @@ You will need Java 8, although Elasticsearch 2.x requires Java 7. Java 7 is not ## Dependencies This project depends only on https://github.com/xbib/metrics which is a slim version of Coda Hale's metrics library, -and Elasticsearch. +Elasticsearch, and Log4j2 API. ## How to decode the Elasticsearch version diff --git a/build.gradle b/build.gradle index bc9dc21..33d34f8 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'org.xbib' -version = '2.2.1.1' +version = '5.0.1.0' printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" + "Build: group: ${project.group} name: ${project.name} version: ${project.version}\n", @@ -56,11 +56,10 @@ configurations { dependencies { compile "org.xbib:metrics:1.0.0" - compile "org.elasticsearch:elasticsearch:2.2.1" - testCompile "net.java.dev.jna:jna:4.1.0" + compile "org.elasticsearch.client:transport:5.0.1" + compile "org.apache.logging.log4j:log4j-api:2.7" testCompile "junit:junit:4.12" testCompile "org.apache.logging.log4j:log4j-core:2.7" - testCompile "org.apache.logging.log4j:log4j-slf4j-impl:2.7" wagon 'org.apache.maven.wagon:wagon-ssh-external:2.10' } @@ -69,7 +68,7 @@ tasks.withType(JavaCompile) { options.compilerArgs << "-Xlint:all" << "-profile" << "compact3" } -task integrationTest(type: Test) { +task integrationTest(type: Test, group: 'verification') { include '**/MiscTestSuite.class' include '**/BulkNodeTestSuite.class' include '**/BulkTransportTestSuite.class' @@ -81,7 +80,7 @@ task integrationTest(type: Test) { classpath += sourceSets.integrationTest.output outputs.upToDateWhen { false } systemProperty 'path.home', projectDir.absolutePath - testLogging.showStandardStreams = true + testLogging.showStandardStreams = false } integrationTest.mustRunAfter test diff --git a/src/integration-test/java/org/elasticsearch/node/MockNode.java b/src/integration-test/java/org/elasticsearch/node/MockNode.java index b0c02eb..10c9e86 100644 --- a/src/integration-test/java/org/elasticsearch/node/MockNode.java +++ b/src/integration-test/java/org/elasticsearch/node/MockNode.java @@ -1,6 +1,5 @@ package org.elasticsearch.node; -import org.elasticsearch.Version; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.plugins.Plugin; @@ -21,14 +20,14 @@ public class MockNode extends Node { super(settings); } - public MockNode(Settings settings, Collection> classpathPlugins) { - super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins); - } - public MockNode(Settings settings, Class classpathPlugin) { this(settings, list(classpathPlugin)); } + public MockNode(Settings settings, Collection> classpathPlugins) { + super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins); + } + private static Collection> list(Class classpathPlugin) { Collection> list = new ArrayList<>(); list.add(classpathPlugin); diff --git a/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java b/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java index 970268e..0c683c1 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/AliasTest.java @@ -4,16 +4,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.junit.Test; import java.io.IOException; @@ -27,9 +26,9 @@ import java.util.regex.Pattern; /** * */ -public class AliasTest extends NodeTestUtils { +public class AliasTest extends NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger(AliasTest.class.getName()); + private static final Logger logger = LogManager.getLogger(AliasTest.class.getName()); @Test public void testAlias() throws IOException { @@ -37,11 +36,9 @@ public class AliasTest extends NodeTestUtils { client("1").admin().indices().create(indexRequest).actionGet(); // put alias IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); - String[] indices = new String[]{"test"}; - String[] aliases = new String[]{"test_alias"}; - IndicesAliasesRequest.AliasActions aliasAction = - new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); - indicesAliasesRequest.addAliasAction(aliasAction); + indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() + .index("test").alias("test_alias") + ); client("1").admin().indices().aliases(indicesAliasesRequest).actionGet(); // get alias GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY); @@ -62,11 +59,10 @@ public class AliasTest extends NodeTestUtils { indexRequest = new CreateIndexRequest("test20160103"); client("1").admin().indices().create(indexRequest).actionGet(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); - String[] indices = new String[]{"test20160101", "test20160102", "test20160103"}; - String[] aliases = new String[]{alias}; - IndicesAliasesRequest.AliasActions aliasAction = - new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); - indicesAliasesRequest.addAliasAction(aliasAction); + indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() + .indices("test20160101", "test20160102", "test20160103") + .alias(alias) + ); client("1").admin().indices().aliases(indicesAliasesRequest).actionGet(); GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client("1"), diff --git a/src/integration-test/java/org/xbib/elasticsearch/NodeTestUtils.java b/src/integration-test/java/org/xbib/elasticsearch/NodeTestBase.java similarity index 60% rename from src/integration-test/java/org/xbib/elasticsearch/NodeTestUtils.java rename to src/integration-test/java/org/xbib/elasticsearch/NodeTestBase.java index d098332..f68a729 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/NodeTestUtils.java +++ b/src/integration-test/java/org/xbib/elasticsearch/NodeTestBase.java @@ -1,8 +1,7 @@ package org.xbib.elasticsearch; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; - -import org.elasticsearch.ElasticsearchTimeoutException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -10,13 +9,14 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.transport.Netty4Plugin; import org.junit.After; import org.junit.Before; import org.xbib.elasticsearch.extras.client.NetworkUtils; @@ -32,13 +32,13 @@ import java.util.concurrent.atomic.AtomicInteger; /** * */ -public class NodeTestUtils { +public class NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger("test"); + protected static final Logger logger = LogManager.getLogger("test"); - private static Random random = new Random(); + private static final Random random = new Random(); - private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); + private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); private Map nodes = new HashMap<>(); @@ -46,59 +46,39 @@ public class NodeTestUtils { private AtomicInteger counter = new AtomicInteger(); - private String cluster; + private String clustername; private String host; private int port; - private static void deleteFiles() throws IOException { - Path directory = Paths.get(System.getProperty("path.home") + "/data"); - Files.walkFileTree(directory, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - Files.delete(file); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - Files.delete(dir); - return FileVisitResult.CONTINUE; - } - - }); - - } - @Before public void startNodes() { try { - logger.info("starting"); + logger.info("settings cluster name"); setClusterName(); + logger.info("starting nodes"); startNode("1"); findNodeAddress(); - try { - ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) - .timeout(TimeValue.timeValueSeconds(30))).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + ", from here on, everything will fail!"); - } - } catch (ElasticsearchTimeoutException e) { - throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); + ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE, + new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) + .timeout(TimeValue.timeValueSeconds(30))).actionGet(); + if (healthResponse != null && healthResponse.isTimedOut()) { + throw new IOException("cluster state is " + healthResponse.getStatus().name() + + ", from here on, everything will fail!"); } + logger.info("nodes are started"); } catch (Throwable t) { - logger.error("startNodes failed", t); + logger.error("start of nodes failed", t); } } @After public void stopNodes() { try { + logger.info("stopping nodes"); closeNodes(); - } catch (Exception e) { + } catch (Throwable e) { logger.error("can not close nodes", e); } finally { try { @@ -114,37 +94,43 @@ public class NodeTestUtils { } protected void setClusterName() { - this.cluster = "test-helper-cluster-" + this.clustername = "test-helper-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "-" + System.getProperty("user.name") + "-" + counter.incrementAndGet(); } protected String getClusterName() { - return cluster; - } - - protected Settings getSettings() { - return settingsBuilder() - .put("host", host) - .put("port", port) - .put("cluster.name", cluster) - .put("path.home", getHome()) - .build(); + return clustername; } protected Settings getNodeSettings() { - return settingsBuilder() - .put("cluster.name", cluster) - .put("cluster.routing.schedule", "50ms") - .put("cluster.routing.allocation.disk.threshold_enabled", false) - .put("discovery.zen.multicast.enabled", true) - .put("discovery.zen.multicast.ping_timeout", "5s") - .put("http.enabled", true) - .put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors()) - .put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low - .put("index.number_of_replicas", 0) + String hostname = NetworkUtils.getLocalAddress().getHostName(); + return Settings.builder() + .put("cluster.name", clustername) + // required to build a cluster, replica tests will test this. + .put("discovery.zen.ping.unicast.hosts", hostname) + .put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME) + .put("network.host", hostname) + .put("http.enabled", false) .put("path.home", getHome()) + // maximum five nodes on same host + .put("node.max_local_storage_nodes", 5) + .put("thread_pool.bulk.size", Runtime.getRuntime().availableProcessors()) + // default is 50 which is too low + .put("thread_pool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) + .build(); + } + + + protected Settings getClientSettings() { + if (host == null) { + throw new IllegalStateException("host is null"); + } + // the host to which transport client should connect to + return Settings.builder() + .put("cluster.name", clustername) + .put("host", host + ":" + port) .build(); } @@ -153,7 +139,11 @@ public class NodeTestUtils { } public void startNode(String id) throws IOException { - buildNode(id).start(); + try { + buildNode(id).start(); + } catch (NodeValidationException e) { + throw new IOException(e); + } } public AbstractClient client(String id) { @@ -179,22 +169,30 @@ public class NodeTestUtils { protected void findNodeAddress() { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); - Object obj = response.iterator().next().getTransport().getAddress() + Object obj = response.getNodes().iterator().next().getTransport().getAddress() .publishAddress(); if (obj instanceof InetSocketTransportAddress) { InetSocketTransportAddress address = (InetSocketTransportAddress) obj; host = address.address().getHostName(); port = address.address().getPort(); + } else if (obj instanceof LocalTransportAddress) { + LocalTransportAddress address = (LocalTransportAddress) obj; + host = address.getHost(); + port = address.getPort(); + } else { + logger.info("class=" + obj.getClass()); + } + if (host == null) { + throw new IllegalArgumentException("host not found"); } } private Node buildNode(String id) throws IOException { - Settings nodeSettings = settingsBuilder() + Settings nodeSettings = Settings.builder() .put(getNodeSettings()) - .put("name", id) .build(); logger.info("settings={}", nodeSettings.getAsMap()); - Node node = new MockNode(nodeSettings); + Node node = new MockNode(nodeSettings, Netty4Plugin.class); AbstractClient client = (AbstractClient) node.client(); nodes.put(id, node); clients.put(id, client); @@ -210,4 +208,22 @@ public class NodeTestUtils { } return new String(buf); } + + private static void deleteFiles() throws IOException { + Path directory = Paths.get(System.getProperty("path.home") + "/data"); + Files.walkFileTree(directory, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + + }); + } } diff --git a/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java b/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java index 8d1276a..efed10c 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/SearchTest.java @@ -4,13 +4,13 @@ import static org.elasticsearch.client.Requests.indexRequest; import static org.elasticsearch.client.Requests.refreshRequest; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.sort.SortOrder; @@ -19,9 +19,9 @@ import org.junit.Test; /** * */ -public class SearchTest extends NodeTestUtils { +public class SearchTest extends NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger("test"); + private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); @Test public void testSearch() throws Exception { @@ -43,7 +43,8 @@ public class SearchTest extends NodeTestUtils { .field("user8", "kimchy") .field("user9", "kimchy") .field("rowcount", i) - .field("rs", 1234))); + .field("rs", 1234) + .endObject())); } client.bulk(builder.request()).actionGet(); diff --git a/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java b/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java index 0af13df..70f1373 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/SimpleTest.java @@ -1,28 +1,32 @@ package org.xbib.elasticsearch; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.junit.Assert.assertEquals; +import org.elasticsearch.action.admin.indices.create.CreateIndexAction; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Settings; import org.junit.Test; /** * */ -public class SimpleTest extends NodeTestUtils { +public class SimpleTest extends NodeTestBase { protected Settings getNodeSettings() { - return settingsBuilder() - .put("path.home", System.getProperty("path.home")) - .put("index.analysis.analyzer.default.filter.0", "lowercase") - .put("index.analysis.analyzer.default.filter.1", "trim") - .put("index.analysis.analyzer.default.tokenizer", "keyword") + return Settings.builder() + .put("cluster.name", getClusterName()) + .put("discovery.type", "local") + .put("transport.type", "local") + .put("http.enabled", false) + .put("path.home", getHome()) + .put("node.max_local_storage_nodes", 5) .build(); } @@ -35,6 +39,15 @@ public class SimpleTest extends NodeTestUtils { } catch (Exception e) { // ignore } + CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client("1"), CreateIndexAction.INSTANCE) + .setIndex("test") + .setSettings(Settings.builder() + .put("index.analysis.analyzer.default.filter.0", "lowercase") + .put("index.analysis.analyzer.default.filter.1", "trim") + .put("index.analysis.analyzer.default.tokenizer", "keyword") + .build()); + createIndexRequestBuilder.execute().actionGet(); + IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(client("1"), IndexAction.INSTANCE); indexRequestBuilder .setIndex("test") @@ -42,7 +55,7 @@ public class SimpleTest extends NodeTestUtils { .setId("1") .setSource(jsonBuilder().startObject().field("field", "1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject()) - .setRefresh(true) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .execute() .actionGet(); String doc = client("1").prepareSearch("test") diff --git a/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java b/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java index 6e252d1..d412654 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/WildcardTest.java @@ -1,10 +1,10 @@ package org.xbib.elasticsearch; import static org.elasticsearch.client.Requests.indexRequest; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; @@ -15,17 +15,16 @@ import java.io.IOException; /** * */ -public class WildcardTest extends NodeTestUtils { +public class WildcardTest extends NodeTestBase { protected Settings getNodeSettings() { - return settingsBuilder() + return Settings.builder() .put("cluster.name", getClusterName()) - .put("cluster.routing.allocation.disk.threshold_enabled", false) - .put("discovery.zen.multicast.enabled", false) + .put("discovery.type", "local") + .put("transport.type", "local") .put("http.enabled", false) - .put("path.home", System.getProperty("path.home")) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) + .put("path.home", getHome()) + .put("node.max_local_storage_nodes", 5) .build(); } @@ -51,7 +50,8 @@ public class WildcardTest extends NodeTestUtils { client.index(indexRequest() .index("index").type("type").id(id) .source(jsonBuilder().startObject().field("field", fieldValue).endObject()) - .refresh(true)).actionGet(); + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) + .actionGet(); } private long count(Client client, QueryBuilder queryBuilder) { diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java index e01ca67..a1165ff 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java @@ -4,36 +4,39 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.util.ExecutorServices; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.junit.Before; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * */ -public class BulkNodeClientTest extends NodeTestUtils { +public class BulkNodeClientTest extends NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeClientTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(BulkNodeClientTest.class.getName()); private static final Long MAX_ACTIONS = 1000L; @@ -165,18 +168,15 @@ public class BulkNodeClientTest extends NodeTestUtils { .toBulkNodeClient(client("1")); try { client.newIndex("test") - .startBulk("test", -1, 1000); - ThreadPoolExecutor pool = EsExecutors.newFixed("bulk-nodeclient-test", maxthreads, 30, - EsExecutors.daemonThreadFactory("bulk-nodeclient-test")); + .startBulk("test", 30 * 1000, 1000); + ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { - pool.execute(new Runnable() { - public void run() { - for (int i = 0; i < maxloop; i++) { - client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); - } - latch.countDown(); + executorService.execute(() -> { + for (int i1 = 0; i1 < maxloop; i1++) { + client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } + latch.countDown(); }); } logger.info("waiting for max 30 seconds..."); @@ -184,8 +184,8 @@ public class BulkNodeClientTest extends NodeTestUtils { logger.info("flush..."); client.flushIngest(); client.waitForResponses(TimeValue.timeValueSeconds(30)); - logger.info("got all responses, thread pool shutdown..."); - pool.shutdown(); + logger.info("got all responses, executor service shutdown..."); + executorService.shutdown(); logger.info("pool is shut down"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java index 09c628d..2b79ddc 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClusterBlockTest.java @@ -1,24 +1,24 @@ package org.xbib.elasticsearch.extras.client.node; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.junit.Before; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * */ -public class BulkNodeClusterBlockTest extends NodeTestUtils { +public class BulkNodeClusterBlockTest extends NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger("test"); + private static final Logger logger = LogManager.getLogger(BulkNodeClusterBlockTest.class.getName()); @Before public void startNodes() { @@ -34,7 +34,7 @@ public class BulkNodeClusterBlockTest extends NodeTestUtils { } protected Settings getNodeSettings() { - return Settings.settingsBuilder() + return Settings.builder() .put(super.getNodeSettings()) .put("discovery.zen.minimum_master_nodes", 2) // block until we have two nodes .build(); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java index 7d8ba1f..d7e9a30 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java @@ -1,13 +1,13 @@ package org.xbib.elasticsearch.extras.client.node; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -18,9 +18,9 @@ import static org.junit.Assert.*; /** * */ -public class BulkNodeDuplicateIDTest extends NodeTestUtils { +public class BulkNodeDuplicateIDTest extends NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeDuplicateIDTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(BulkNodeDuplicateIDTest.class.getName()); private static final Long MAX_ACTIONS = 1000L; diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java index d4b19b0..a6f00a4 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java @@ -1,13 +1,13 @@ package org.xbib.elasticsearch.extras.client.node; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.IndexAliasAdder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; @@ -22,9 +22,9 @@ import static org.junit.Assert.assertFalse; /** * */ -public class BulkNodeIndexAliasTest extends NodeTestUtils { +public class BulkNodeIndexAliasTest extends NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeIndexAliasTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(BulkNodeIndexAliasTest.class.getName()); @Test public void testIndexAlias() throws Exception { diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java index 93141e1..637a28e 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java @@ -1,16 +1,21 @@ package org.xbib.elasticsearch.extras.client.node; -import org.elasticsearch.action.admin.indices.stats.*; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.indexing.IndexingStats; +import org.elasticsearch.index.shard.IndexingStats; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -21,9 +26,12 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -public class BulkNodeReplicaTest extends NodeTestUtils { +/** + * + */ +public class BulkNodeReplicaTest extends NodeTestBase { - private final static ESLogger logger = ESLoggerFactory.getLogger(BulkNodeReplicaTest.class.getSimpleName()); + private final static Logger logger = LogManager.getLogger(BulkNodeReplicaTest.class.getName()); @Test public void testReplicaLevel() throws Exception { @@ -33,12 +41,12 @@ public class BulkNodeReplicaTest extends NodeTestUtils { startNode("3"); startNode("4"); - Settings settingsTest1 = Settings.settingsBuilder() + Settings settingsTest1 = Settings.builder() .put("index.number_of_shards", 2) .put("index.number_of_replicas", 3) .build(); - Settings settingsTest2 = Settings.settingsBuilder() + Settings settingsTest2 = Settings.builder() .put("index.number_of_shards", 2) .put("index.number_of_replicas", 1) .build(); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java index b1c88fe..7f43457 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java @@ -1,12 +1,12 @@ package org.xbib.elasticsearch.extras.client.node; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -17,9 +17,9 @@ import static org.junit.Assert.assertFalse; /** * */ -public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils { +public class BulkNodeUpdateReplicaLevelTest extends NodeTestBase { - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeUpdateReplicaLevelTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(BulkNodeUpdateReplicaLevelTest.class.getName()); @Test public void testUpdateReplicaLevel() throws Exception { @@ -33,7 +33,7 @@ public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils { int shardsAfterReplica; - Settings settings = Settings.settingsBuilder() + Settings settings = Settings.builder() .put("index.number_of_shards", numberOfShards) .put("index.number_of_replicas", 0) .build(); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java index 0a35742..4b6dc41 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java @@ -3,15 +3,12 @@ package org.xbib.elasticsearch.extras.client.transport; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.query.QueryBuilders; import org.junit.Before; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -19,7 +16,8 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -28,9 +26,7 @@ import static org.junit.Assert.assertFalse; /** * */ -public class BulkTransportClientTest extends NodeTestUtils { - - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportClientTest.class.getSimpleName()); +public class BulkTransportClientTest extends NodeTestBase { private static final Long MAX_ACTIONS = 1000L; @@ -47,22 +43,26 @@ public class BulkTransportClientTest extends NodeTestUtils { } @Test - public void testBulkClient() throws IOException { + public void testBulkClientIndexCreation() throws IOException { + logger.info("firing up BulkTransportClient"); final BulkTransportClient client = ClientBuilder.builder() - .put(getSettings()) + .put(getClientSettings()) .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); + logger.info("creating index"); client.newIndex("test"); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); } assertFalse(client.hasThrowable()); try { + logger.info("deleting/creating index sequence start"); client.deleteIndex("test") .newIndex("test") .deleteIndex("test"); + logger.info("deleting/creating index sequence end"); } catch (NoNodeAvailableException e) { logger.error("no node available"); } finally { @@ -76,18 +76,24 @@ public class BulkTransportClientTest extends NodeTestUtils { @Test public void testSingleDocBulkClient() throws IOException { + logger.info("firing up BulkTransportClient"); final BulkTransportClient client = ClientBuilder.builder() - .put(getSettings()) + .put(getClientSettings()) .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); try { + logger.info("creating index"); client.newIndex("test"); + logger.info("indexing one doc"); client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest + logger.info("flush"); client.flushIngest(); + logger.info("wait for responses"); client.waitForResponses(TimeValue.timeValueSeconds(30)); + logger.info("waited for responses"); } catch (InterruptedException e) { // ignore } catch (ExecutionException e) { @@ -105,10 +111,10 @@ public class BulkTransportClientTest extends NodeTestUtils { } @Test - public void testRandomDocsBulkClient() throws IOException { + public void testRandomDocsBulkClient() { long numactions = NUM_ACTIONS; final BulkTransportClient client = ClientBuilder.builder() - .put(getSettings()) + .put(getClientSettings()) .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .setMetric(new SimpleBulkMetric()) @@ -127,7 +133,10 @@ public class BulkTransportClientTest extends NodeTestUtils { logger.error(e.getMessage(), e); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); + } catch (Throwable t) { + logger.error("unexcepted: " + t.getMessage(), t); } finally { + logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount()); assertEquals(numactions, client.getMetric().getSucceeded().getCount()); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); @@ -138,34 +147,36 @@ public class BulkTransportClientTest extends NodeTestUtils { } @Test - public void testThreadedRandomDocsBulkClient() throws Exception { + public void testThreadedRandomDocsBulkClient() { int maxthreads = Runtime.getRuntime().availableProcessors(); long maxactions = MAX_ACTIONS; final long maxloop = NUM_ACTIONS; - - Settings settingsForIndex = Settings.settingsBuilder() - .put("index.number_of_shards", 2) - .put("index.number_of_replicas", 1) - .build(); - + logger.info("firing up client"); final BulkTransportClient client = ClientBuilder.builder() - .put(getSettings()) + .put(getClientSettings()) .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions) .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) // = disable autoflush for this test .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); try { + logger.info("new index"); + Settings settingsForIndex = Settings.builder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1) + .build(); client.newIndex("test", settingsForIndex, null) .startBulk("test", -1, 1000); - ThreadPoolExecutor pool = - EsExecutors.newFixed("bulkclient-test", maxthreads, 30, EsExecutors.daemonThreadFactory("bulkclient-test")); + logger.info("pool"); + ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { - pool.execute(() -> { + executorService.execute(() -> { + logger.info("executing runnable"); for (int i1 = 0; i1 < maxloop; i1++) { client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } + logger.info("done runnable"); latch.countDown(); }); } @@ -174,13 +185,16 @@ public class BulkTransportClientTest extends NodeTestUtils { logger.info("client flush ..."); client.flushIngest(); client.waitForResponses(TimeValue.timeValueSeconds(30)); - logger.info("thread pool to be shut down ..."); - pool.shutdown(); - logger.info("poot shut down"); + logger.info("executor service to be shut down ..."); + executorService.shutdown(); + logger.info("executor service is shut down"); + client.stopBulk("test"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); + } catch (Throwable t) { + logger.error("unexpected error: " + t.getMessage(), t); } finally { - client.stopBulk("test"); + logger.info("assuring {} == {}", maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); @@ -188,8 +202,7 @@ public class BulkTransportClientTest extends NodeTestUtils { assertFalse(client.hasThrowable()); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) - // to avoid NPE at org.elasticsearch.action.search.SearchRequest.writeTo(SearchRequest.java:580) - .setIndices("_all") + .setIndices("test") .setQuery(QueryBuilders.matchAllQuery()) .setSize(0); assertEquals(maxthreads * maxloop, @@ -197,5 +210,4 @@ public class BulkTransportClientTest extends NodeTestUtils { client.shutdown(); } } - } diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java index 00a4066..574928c 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java @@ -3,11 +3,9 @@ package org.xbib.elasticsearch.extras.client.transport; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -15,9 +13,10 @@ import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.junit.Assert.*; -public class BulkTransportDuplicateIDTest extends NodeTestUtils { - - private final static ESLogger logger = ESLoggerFactory.getLogger(BulkTransportDuplicateIDTest.class.getSimpleName()); +/** + * + */ +public class BulkTransportDuplicateIDTest extends NodeTestBase { private final static Long MAX_ACTIONS = 1000L; @@ -27,7 +26,7 @@ public class BulkTransportDuplicateIDTest extends NodeTestUtils { public void testDuplicateDocIDs() throws Exception { long numactions = NUM_ACTIONS; final BulkTransportClient client = ClientBuilder.builder() - .put(getSettings()) + .put(getClientSettings()) .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java index 119688e..386f212 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java @@ -4,13 +4,11 @@ import org.elasticsearch.action.admin.indices.stats.*; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.indexing.IndexingStats; +import org.elasticsearch.index.shard.IndexingStats; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -24,9 +22,7 @@ import static org.junit.Assert.assertFalse; /** * */ -public class BulkTransportReplicaTest extends NodeTestUtils { - - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportReplicaTest.class.getSimpleName()); +public class BulkTransportReplicaTest extends NodeTestBase { @Test public void testReplicaLevel() throws Exception { @@ -36,18 +32,18 @@ public class BulkTransportReplicaTest extends NodeTestUtils { startNode("3"); startNode("4"); - Settings settingsTest1 = Settings.settingsBuilder() + Settings settingsTest1 = Settings.builder() .put("index.number_of_shards", 2) .put("index.number_of_replicas", 3) .build(); - Settings settingsTest2 = Settings.settingsBuilder() + Settings settingsTest2 = Settings.builder() .put("index.number_of_shards", 2) .put("index.number_of_replicas", 1) .build(); final BulkTransportClient client = ClientBuilder.builder() - .put(getSettings()) + .put(getClientSettings()) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java index 8ed2c4a..331ac11 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java @@ -1,12 +1,12 @@ package org.xbib.elasticsearch.extras.client.transport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; -import org.xbib.elasticsearch.NodeTestUtils; +import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -17,10 +17,9 @@ import static org.junit.Assert.assertFalse; /** * */ -public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils { +public class BulkTransportUpdateReplicaLevelTest extends NodeTestBase { - private static final ESLogger logger = - ESLoggerFactory.getLogger(BulkTransportUpdateReplicaLevelTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(BulkTransportUpdateReplicaLevelTest.class.getName()); @Test public void testUpdateReplicaLevel() throws Exception { @@ -34,13 +33,13 @@ public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils { int shardsAfterReplica; - Settings settings = Settings.settingsBuilder() + Settings settings = Settings.builder() .put("index.number_of_shards", numberOfShards) .put("index.number_of_replicas", 0) .build(); final BulkTransportClient client = ClientBuilder.builder() - .put(getSettings()) + .put(getClientSettings()) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java index 877067b..f4ce3aa 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java @@ -1,5 +1,7 @@ package org.xbib.elasticsearch.extras.client; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ElasticsearchTimeoutException; @@ -39,8 +41,6 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; @@ -68,7 +68,7 @@ import java.util.regex.Pattern; */ public abstract class AbstractClient { - private static final ESLogger logger = ESLoggerFactory.getLogger(AbstractClient.class.getName()); + private static final Logger logger = LogManager.getLogger(AbstractClient.class.getName()); private Settings.Builder settingsBuilder; @@ -87,7 +87,7 @@ public abstract class AbstractClient { } public void resetSettings() { - this.settingsBuilder = Settings.settingsBuilder(); + this.settingsBuilder = Settings.builder(); settings = null; mappings = new HashMap<>(); } @@ -98,31 +98,31 @@ public abstract class AbstractClient { public void setting(String key, String value) { if (settingsBuilder == null) { - settingsBuilder = Settings.settingsBuilder(); + settingsBuilder = Settings.builder(); } settingsBuilder.put(key, value); } public void setting(String key, Boolean value) { if (settingsBuilder == null) { - settingsBuilder = Settings.settingsBuilder(); + settingsBuilder = Settings.builder(); } settingsBuilder.put(key, value); } public void setting(String key, Integer value) { if (settingsBuilder == null) { - settingsBuilder = Settings.settingsBuilder(); + settingsBuilder = Settings.builder(); } settingsBuilder.put(key, value); } public void setting(InputStream in) throws IOException { - settingsBuilder = Settings.settingsBuilder().loadFromStream(".json", in); + settingsBuilder = Settings.builder().loadFromStream(".json", in); } public Settings.Builder settingsBuilder() { - return settingsBuilder != null ? settingsBuilder : Settings.settingsBuilder(); + return settingsBuilder != null ? settingsBuilder : Settings.builder(); } public Settings settings() { @@ -130,7 +130,7 @@ public abstract class AbstractClient { return settings; } if (settingsBuilder == null) { - settingsBuilder = Settings.settingsBuilder(); + settingsBuilder = Settings.builder(); } return settingsBuilder.build(); } @@ -166,7 +166,7 @@ public abstract class AbstractClient { if (value == null) { throw new IOException("no value given"); } - Settings.Builder updateSettingsBuilder = Settings.settingsBuilder(); + Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) .settings(updateSettingsBuilder); @@ -218,7 +218,7 @@ public abstract class AbstractClient { new ClusterStateRequestBuilder(client(), ClusterStateAction.INSTANCE).all(); ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet(); String name = clusterStateResponse.getClusterName().value(); - int nodeCount = clusterStateResponse.getState().getNodes().size(); + int nodeCount = clusterStateResponse.getState().getNodes().getSize(); return name + " (" + nodeCount + " nodes connected)"; } catch (ElasticsearchTimeoutException e) { logger.warn(e.getMessage(), e); @@ -476,9 +476,9 @@ public abstract class AbstractClient { return null; } SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client(), SearchAction.INSTANCE); - SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); + SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); SearchResponse searchResponse = searchRequestBuilder.setIndices(index) - .addField(timestampfieldname) + .addStoredField(timestampfieldname) .setSize(1) .addSort(sort) .execute().actionGet(); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java index 814a7c7..b15c243 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -51,7 +50,7 @@ public class BulkProcessor implements Closeable { private BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) { this.bulkActions = bulkActions; - this.bulkSize = bulkSize.bytes(); + this.bulkSize = bulkSize.getBytes(); this.bulkRequest = new BulkRequest(); this.bulkRequestHandler = concurrentRequests == 0 ? @@ -176,18 +175,6 @@ public class BulkProcessor implements Closeable { executeIfNeeded(); } - public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) - throws Exception { - return add(data, defaultIndex, defaultType, null); - } - - public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, - @Nullable String defaultType, @Nullable Object payload) throws Exception { - bulkRequest.add(data, defaultIndex, defaultType, null, null, payload, true); - executeIfNeeded(); - return this; - } - private void executeIfNeeded() { ensureOpen(); if (!isOverTheLimit()) { @@ -441,7 +428,7 @@ public class BulkProcessor implements Closeable { } @Override - public void onFailure(Throwable e) { + public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java b/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java index 9c5ffc2..277e13a 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/NetworkUtils.java @@ -1,7 +1,7 @@ package org.xbib.elasticsearch.extras.client; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.net.Inet4Address; @@ -20,7 +20,7 @@ import java.util.Locale; */ public class NetworkUtils { - private static final ESLogger logger = ESLoggerFactory.getLogger(NetworkUtils.class.getName()); + private static final Logger logger = LogManager.getLogger(NetworkUtils.class.getName()); private static final String IPV4_SETTING = "java.net.preferIPv4Stack"; diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java index 74a8dc4..17a0779 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java @@ -1,8 +1,8 @@ package org.xbib.elasticsearch.extras.client.node; -import com.google.common.collect.ImmutableSet; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; @@ -18,13 +18,12 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.xbib.elasticsearch.extras.client.AbstractClient; import org.xbib.elasticsearch.extras.client.BulkControl; @@ -44,7 +43,7 @@ import java.util.concurrent.TimeUnit; */ public class BulkNodeClient extends AbstractClient implements ClientMethods { - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeClient.class.getName()); + private static final Logger logger = LogManager.getLogger(BulkNodeClient.class.getName()); private int maxActionsPerRequest = DEFAULT_MAX_ACTIONS_PER_REQUEST; @@ -216,7 +215,11 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { version, effectiveSettings.getAsMap()); Collection> plugins = Collections.emptyList(); this.node = new BulkNode(new Environment(effectiveSettings), plugins); - node.start(); + try { + node.start(); + } catch (NodeValidationException e) { + throw new IOException(e); + } this.client = node.client(); } } @@ -389,7 +392,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { } if (control != null && control.indices() != null && !control.indices().isEmpty()) { logger.debug("stopping bulk mode for indices {}...", control.indices()); - for (String index : ImmutableSet.copyOf(control.indices())) { + for (String index : control.indices()) { stopBulk(index); } metric.stop(); @@ -505,7 +508,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { private class BulkNode extends Node { BulkNode(Environment env, Collection> classpathPlugins) { - super(env, Version.CURRENT, classpathPlugins); + super(env, classpathPlugins); } } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java index b03aeef..be51e62 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java @@ -1,6 +1,7 @@ package org.xbib.elasticsearch.extras.client.transport; -import com.google.common.collect.ImmutableSet; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; @@ -21,12 +22,13 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.transport.Netty4Plugin; import org.xbib.elasticsearch.extras.client.AbstractClient; import org.xbib.elasticsearch.extras.client.BulkControl; import org.xbib.elasticsearch.extras.client.BulkMetric; @@ -39,6 +41,7 @@ import java.io.InputStream; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -49,7 +52,9 @@ import java.util.concurrent.TimeUnit; */ public class BulkTransportClient extends AbstractClient implements ClientMethods { - private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportClient.class.getName()); + private static final Logger logger = LogManager.getLogger(BulkTransportClient.class.getName()); + + private static final Settings DEFAULT_SETTINGS = Settings.builder().put("transport.type.default", "local").build(); private int maxActionsPerRequest = DEFAULT_MAX_ACTIONS_PER_REQUEST; @@ -165,6 +170,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods builder.setBulkSize(maxVolumePerRequest); } this.bulkProcessor = builder.build(); + // aut-connect here try { Collection addrs = findAddresses(settings); if (!connect(addrs, settings.getAsBoolean("autodiscover", false))) { @@ -205,9 +211,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods + " " + System.getProperty("java.vm.version"); logger.info("creating transport client on {} with effective settings {}", version, settings.getAsMap()); - this.client = TransportClient.builder() - .settings(settings) - .build(); + this.client = new TransportClient(Settings.builder() + .put("cluster.name", settings.get("cluster.name")) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME) + .build(), Collections.singletonList(Netty4Plugin.class)); this.ignoreBulkErrors = settings.getAsBoolean("ignoreBulkErrors", true); } } @@ -313,7 +320,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods if (control == null) { return this; } - if (!control.isBulk(index)) { + if (!control.isBulk(index) && startRefreshIntervalSeconds > 0L && stopRefreshIntervalSeconds > 0L) { control.startBulk(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); updateIndexSetting(index, "refresh_interval", startRefreshIntervalSeconds + "s"); } @@ -326,7 +333,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods return this; } if (control.isBulk(index)) { - updateIndexSetting(index, "refresh_interval", control.getStopBulkRefreshIntervals().get(index) + "s"); + long secs = control.getStopBulkRefreshIntervals().get(index); + if (secs > 0L) { + updateIndexSetting(index, "refresh_interval", secs + "s"); + } control.finishBulk(index); } return this; @@ -461,7 +471,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods } if (control != null && control.indices() != null && !control.indices().isEmpty()) { logger.debug("stopping bulk mode for indices {}...", control.indices()); - for (String index : ImmutableSet.copyOf(control.indices())) { + for (String index : control.indices()) { stopBulk(index); } metric.stop(); @@ -485,7 +495,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods } private Settings findSettings() { - Settings.Builder settingsBuilder = Settings.settingsBuilder(); + Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.put("host", "localhost"); try { String hostname = NetworkUtils.getLocalAddress().getHostName(); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java index 3912ce7..2ded1ab 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java @@ -1,9 +1,10 @@ package org.xbib.elasticsearch.extras.client.transport; -import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; -import com.google.common.collect.ImmutableMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -16,39 +17,41 @@ import org.elasticsearch.action.TransportActionNodeProxy; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessRequest; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; -import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.client.support.AbstractClient; -import org.elasticsearch.client.support.Headers; -import org.elasticsearch.client.transport.ClientTransportModule; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterNameModule; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.ModulesBuilder; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.indices.breaker.CircuitBreakerModule; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.node.Node; import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.PluginsModule; import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.transport.FutureTransportResponseHandler; -import org.elasticsearch.transport.TransportModule; +import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -59,6 +62,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Stripped-down transport client without node sampling and without retrying. @@ -73,17 +77,13 @@ public class TransportClient extends AbstractClient { private final Injector injector; - private final ProxyActionMap proxyActionMap; - private final long pingTimeout; private final ClusterName clusterName; private final TransportService transportService; - private final Version minCompatibilityVersion; - - private final Headers headers; + private final ProxyActionMap proxy; private final AtomicInteger tempNodeId = new AtomicInteger(); @@ -99,20 +99,41 @@ public class TransportClient extends AbstractClient { private volatile boolean closed; - private TransportClient(Injector injector) { - super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), - injector.getInstance(Headers.class)); - this.injector = injector; - this.clusterName = injector.getInstance(ClusterName.class); - this.transportService = injector.getInstance(TransportService.class); - this.minCompatibilityVersion = injector.getInstance(Version.class).minimumCompatibilityVersion(); - this.headers = injector.getInstance(Headers.class); - this.pingTimeout = this.settings.getAsTime("client.transport.ping_timeout", timeValueSeconds(5)).millis(); - this.proxyActionMap = injector.getInstance(ProxyActionMap.class); + + /** + * Creates a new TransportClient with the given settings and plugins. + * @param settings settings + */ + public TransportClient(Settings settings) { + this(buildTemplate(settings, Settings.EMPTY, Collections.emptyList())); } - public static Builder builder() { - return new Builder(); + /** + * Creates a new TransportClient with the given settings and plugins. + * @param settings settings + * @param plugins plugins + */ + public TransportClient(Settings settings, Collection> plugins) { + this(buildTemplate(settings, Settings.EMPTY, plugins)); + } + + /** + * Creates a new TransportClient with the given settings, defaults and plugins. + * @param settings the client settings + * @param defaultSettings default settings that are merged after the plugins have added it's additional settings. + * @param plugins the client plugins + */ + protected TransportClient(Settings settings, Settings defaultSettings, Collection> plugins) { + this(buildTemplate(settings, defaultSettings, plugins)); + } + + private TransportClient(ClientTemplate template) { + super(template.getSettings(), template.getThreadPool()); + this.injector = template.injector; + this.clusterName = new ClusterName(template.getSettings().get("cluster.name", "elasticsearch")); + this.transportService = injector.getInstance(TransportService.class); + this.pingTimeout = this.settings.getAsTime("client.transport.ping_timeout", timeValueSeconds(5)).millis(); + this.proxy = template.proxy; } /** @@ -123,7 +144,7 @@ public class TransportClient extends AbstractClient { public List transportAddresses() { List lstBuilder = new ArrayList<>(); for (DiscoveryNode listedNode : listedNodes) { - lstBuilder.add(listedNode.address()); + lstBuilder.add(listedNode.getAddress()); } return Collections.unmodifiableList(lstBuilder); } @@ -170,7 +191,7 @@ public class TransportClient extends AbstractClient { public TransportClient addDiscoveryNodes(DiscoveryNodes discoveryNodes) { Collection addresses = new ArrayList<>(); for (DiscoveryNode discoveryNode : discoveryNodes) { - addresses.add((InetSocketTransportAddress) discoveryNode.address()); + addresses.add((InetSocketTransportAddress) discoveryNode.getAddress()); } addTransportAddresses(addresses); return this; @@ -185,7 +206,7 @@ public class TransportClient extends AbstractClient { for (TransportAddress transportAddress : transportAddresses) { boolean found = false; for (DiscoveryNode otherNode : listedNodes) { - if (otherNode.address().equals(transportAddress)) { + if (otherNode.getAddress().equals(transportAddress)) { found = true; logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode); break; @@ -202,7 +223,7 @@ public class TransportClient extends AbstractClient { discoveryNodeList.addAll(listedNodes()); for (TransportAddress transportAddress : filtered) { DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeId.incrementAndGet(), transportAddress, - minCompatibilityVersion); + Version.CURRENT.minimumCompatibilityVersion()); logger.debug("adding address [{}]", node); discoveryNodeList.add(node); } @@ -225,7 +246,7 @@ public class TransportClient extends AbstractClient { } List builder = new ArrayList<>(); for (DiscoveryNode otherNode : listedNodes) { - if (!otherNode.address().equals(transportAddress)) { + if (!otherNode.getAddress().equals(transportAddress)) { builder.add(otherNode); } else { logger.debug("removing address [{}]", otherNode); @@ -253,7 +274,7 @@ public class TransportClient extends AbstractClient { nodes = Collections.emptyList(); } injector.getInstance(TransportService.class).close(); - for (Class plugin : injector.getInstance(PluginsService.class).nodeServices()) { + for (Class plugin : injector.getInstance(PluginsService.class).getGuiceServiceClasses()) { injector.getInstance(plugin).close(); } try { @@ -261,7 +282,6 @@ public class TransportClient extends AbstractClient { } catch (Exception e) { logger.debug(e.getMessage(), e); } - injector.getInstance(PageCacheRecycler.class).close(); } private void connect() { @@ -279,7 +299,7 @@ public class TransportClient extends AbstractClient { } try { LivenessResponse livenessResponse = transportService.submitRequest(listedNode, - TransportLivenessAction.NAME, headers.applyTo(new LivenessRequest()), + TransportLivenessAction.NAME, new LivenessRequest(), TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE) .withTimeout(pingTimeout).build(), new FutureTransportResponseHandler() { @@ -293,9 +313,10 @@ public class TransportClient extends AbstractClient { newFilteredNodes.add(listedNode); } else if (livenessResponse.getDiscoveryNode() != null) { DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode(); - newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo.getHostName(), - nodeWithInfo.getHostAddress(), listedNode.address(), nodeWithInfo.attributes(), - nodeWithInfo.version())); + newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), + nodeWithInfo.getEphemeralId(), nodeWithInfo.getHostName(), + nodeWithInfo.getHostAddress(), listedNode.getAddress(), nodeWithInfo.getAttributes(), + nodeWithInfo.getRoles(), nodeWithInfo.getVersion())); } else { logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode); @@ -324,9 +345,9 @@ public class TransportClient extends AbstractClient { @Override @SuppressWarnings({"unchecked", "rawtypes"}) - protected > + protected , S extends ActionResponse, T extends ActionRequestBuilder> void doExecute(Action action, final R request, final ActionListener listener) { - final TransportActionNodeProxy proxyAction = proxyActionMap.getProxies().get(action); + final TransportActionNodeProxy proxyAction = proxy.getProxies().get(action); if (proxyAction == null) { throw new IllegalStateException("undefined action " + action); } @@ -347,89 +368,17 @@ public class TransportClient extends AbstractClient { } } - /** - * - */ - public static class Builder { - - private Settings settings = Settings.EMPTY; - private List> pluginClasses = new ArrayList<>(); - - public Builder settings(Settings.Builder settings) { - return settings(settings.build()); - } - - public Builder settings(Settings settings) { - this.settings = settings; - return this; - } - - public Builder addPlugin(Class pluginClass) { - pluginClasses.add(pluginClass); - return this; - } - - public TransportClient build() { - Settings transportClientSettings = settingsBuilder() - .put("transport.ping.schedule", this.settings.get("ping.interval", "30s")) - .put(InternalSettingsPreparer.prepareSettings(this.settings)) - .put("network.server", false) - .put("node.client", true) - .put(CLIENT_TYPE_SETTING, CLIENT_TYPE) - .build(); - PluginsService pluginsService = new PluginsService(transportClientSettings, null, null, pluginClasses); - this.settings = pluginsService.updatedSettings(); - Version version = Version.CURRENT; - final ThreadPool threadPool = new ThreadPool(transportClientSettings); - boolean success = false; - try { - ModulesBuilder modules = new ModulesBuilder(); - modules.add(new Version.Module(version)); - // plugin modules must be added here, before others or we can get crazy injection errors... - for (Module pluginModule : pluginsService.nodeModules()) { - modules.add(pluginModule); - } - modules.add(new PluginsModule(pluginsService)); - modules.add(new SettingsModule(this.settings)); - modules.add(new NetworkModule()); - modules.add(new ClusterNameModule(this.settings)); - modules.add(new ThreadPoolModule(threadPool)); - modules.add(new TransportModule(this.settings)); - modules.add(new SearchModule() { - @Override - protected void configure() { - // noop - } - }); - modules.add(new ActionModule(true)); - modules.add(new ClientTransportModule()); - modules.add(new CircuitBreakerModule(this.settings)); - pluginsService.processModules(modules); - Injector injector = modules.createInjector(); - injector.getInstance(TransportService.class).start(); - TransportClient transportClient = new TransportClient(injector); - success = true; - return transportClient; - } finally { - if (!success) { - ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); - } - } - } - } - /** * The {@link ProxyActionMap} must be declared public. */ @SuppressWarnings({"unchecked", "rawtypes"}) - public static class ProxyActionMap { + private static class ProxyActionMap { - private final ImmutableMap proxies; + private final Map proxies; - @Inject - public ProxyActionMap(Settings settings, TransportService transportService, Map actions) { + public ProxyActionMap(Settings settings, TransportService transportService, List actions) { MapBuilder actionsBuilder = new MapBuilder<>(); - for (GenericAction action : actions.values()) { + for (GenericAction action : actions) { if (action instanceof Action) { actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService)); } @@ -437,9 +386,120 @@ public class TransportClient extends AbstractClient { this.proxies = actionsBuilder.immutableMap(); } - public ImmutableMap getProxies() { + Map getProxies() { return proxies; } } + + private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings, + Collection> plugins) { + if (!Node.NODE_NAME_SETTING.exists(providedSettings)) { + providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build(); + } + final PluginsService pluginsService = newPluginService(providedSettings, plugins); + final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build(); + final List resourcesToClose = new ArrayList<>(); + final ThreadPool threadPool = new ThreadPool(settings); + resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); + final NetworkService networkService = new NetworkService(settings, Collections.emptyList()); + try { + final List> additionalSettings = new ArrayList<>(); + final List additionalSettingsFilter = new ArrayList<>(); + additionalSettings.addAll(pluginsService.getPluginSettings()); + additionalSettingsFilter.addAll(pluginsService.getPluginSettingsFilter()); + for (final ExecutorBuilder builder : threadPool.builders()) { + additionalSettings.addAll(builder.getRegisteredSettings()); + } + SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter); + NetworkModule networkModule = new NetworkModule(networkService, settings, true); + SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class)); + List entries = new ArrayList<>(); + entries.addAll(networkModule.getNamedWriteables()); + entries.addAll(searchModule.getNamedWriteables()); + entries.addAll(pluginsService.filterPlugins(Plugin.class).stream() + .flatMap(p -> p.getNamedWriteables().stream()) + .collect(Collectors.toList())); + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); + + ModulesBuilder modules = new ModulesBuilder(); + // plugin modules must be added here, before others or we can get crazy injection errors... + for (Module pluginModule : pluginsService.createGuiceModules()) { + modules.add(pluginModule); + } + modules.add(networkModule); + modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool)); + ActionModule actionModule = new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(), + pluginsService.filterPlugins(ActionPlugin.class)); + modules.add(actionModule); + + pluginsService.processModules(modules); + CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(), + settingsModule.getClusterSettings()); + resourcesToClose.add(circuitBreakerService); + BigArrays bigArrays = new BigArrays(settings, circuitBreakerService); + resourcesToClose.add(bigArrays); + modules.add(settingsModule); + modules.add((b -> { + b.bind(BigArrays.class).toInstance(bigArrays); + b.bind(PluginsService.class).toInstance(pluginsService); + b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); + b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); + })); + Injector injector = modules.createInjector(); + final TransportService transportService = injector.getInstance(TransportService.class); + final ProxyActionMap proxy = new ProxyActionMap(settings, transportService, + actionModule.getActions().values().stream() + .map(ActionPlugin.ActionHandler::getAction).collect(Collectors.toList())); + List pluginLifecycleComponents = new ArrayList<>(); + pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream() + .map(injector::getInstance).collect(Collectors.toList())); + resourcesToClose.addAll(pluginLifecycleComponents); + transportService.start(); + transportService.acceptIncomingRequests(); + ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, + proxy, namedWriteableRegistry); + resourcesToClose.clear(); + return transportClient; + } finally { + IOUtils.closeWhileHandlingException(resourcesToClose); + } + } + + private static final Logger logger = LogManager.getLogger(TransportClient.class); + + private static PluginsService newPluginService(final Settings settings, Collection> plugins) { + final Settings.Builder settingsBuilder = Settings.builder() + .put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval + .put(NetworkService.NETWORK_SERVER.getKey(), false) + .put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE); + if (!settings.isEmpty()) { + logger.info(settings.getAsMap()); + settingsBuilder.put(InternalSettingsPreparer.prepareSettings(settings)); + } + return new PluginsService(settingsBuilder.build(), null, null, plugins); + } + + private static final class ClientTemplate { + final Injector injector; + private final List pluginLifecycleComponents; + private final ProxyActionMap proxy; + private final NamedWriteableRegistry namedWriteableRegistry; + + private ClientTemplate(Injector injector, List pluginLifecycleComponents, + ProxyActionMap proxy, NamedWriteableRegistry namedWriteableRegistry) { + this.injector = injector; + this.pluginLifecycleComponents = pluginLifecycleComponents; + this.proxy = proxy; + this.namedWriteableRegistry = namedWriteableRegistry; + } + + Settings getSettings() { + return injector.getInstance(Settings.class); + } + + ThreadPool getThreadPool() { + return injector.getInstance(ThreadPool.class); + } + } }