From 25f529acd2bf7a4d3ae46ab355ce7f42ed1cf065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Sat, 1 Apr 2017 17:17:31 +0200 Subject: [PATCH] update to Elasticsearch 5.3.0 --- gradle.properties | 6 +- .../java/org/elasticsearch/node/MockNode.java | 1 - .../extras/client/ClusterBlockTest.java | 4 +- .../client/node/BulkNodeClientTest.java | 16 +-- .../transport/BulkTransportClientTest.java | 31 +----- .../extras/client/AbstractClient.java | 3 +- .../extras/client/BulkProcessor.java | 103 ++++++++---------- .../extras/client/node/BulkNodeClient.java | 7 +- .../client/transport/BulkTransportClient.java | 41 ++++--- .../client/transport/MockTransportClient.java | 1 + .../client/transport/TransportClient.java | 31 ++++-- 11 files changed, 128 insertions(+), 116 deletions(-) diff --git a/gradle.properties b/gradle.properties index 2a6c341..95c7fe8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,10 +1,10 @@ group = org.xbib name = elasticsearch-extras-client -version = 5.2.2.0 +version = 5.3.0.0 -elasticsearch-client-transport.version = 5.2.2 +elasticsearch-client-transport.version = 5.3.0 xbib-metrics.version = 1.0.0 -netty-transport-native-epoll.version = 4.1.6.Final +netty-transport-native-epoll.version = 4.1.7.Final log4j.version = 2.8 junit.version = 4.12 wagon.version = 2.10 diff --git a/src/integration-test/java/org/elasticsearch/node/MockNode.java b/src/integration-test/java/org/elasticsearch/node/MockNode.java index 10c9e86..7b5d5b3 100644 --- a/src/integration-test/java/org/elasticsearch/node/MockNode.java +++ b/src/integration-test/java/org/elasticsearch/node/MockNode.java @@ -1,7 +1,6 @@ package org.elasticsearch.node; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.plugins.Plugin; import java.util.ArrayList; diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/ClusterBlockTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/ClusterBlockTest.java index 07e492f..adfc417 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/ClusterBlockTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/ClusterBlockTest.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; import org.junit.Before; import org.junit.Test; import org.xbib.elasticsearch.NodeTestBase; @@ -45,7 +46,8 @@ public class ClusterBlockTest extends NodeTestBase { BulkRequestBuilder brb = client("1").prepareBulk(); XContentBuilder builder = jsonBuilder().startObject().field("field1", "value1").endObject(); String jsonString = builder.string(); - IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1").setSource(jsonString); + IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1") + .setSource(jsonString, XContentType.JSON); brb.add(irb); brb.execute().actionGet(); } diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java index 4a295a7..421be5f 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java @@ -142,10 +142,11 @@ public class BulkNodeClientTest extends NodeTestBase { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - assertEquals(numactions, client.getMetric().getSucceeded().getCount()); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); } + logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getMetric().getSucceeded().getCount()); assertFalse(client.hasThrowable()); client.shutdown(); } @@ -164,8 +165,7 @@ public class BulkNodeClientTest extends NodeTestBase { .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); try { - client.newIndex("test") - .startBulk("test", 30 * 1000, 1000); + client.newIndex("test").startBulk("test", 30 * 1000, 1000); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { @@ -183,11 +183,12 @@ public class BulkNodeClientTest extends NodeTestBase { client.waitForResponses(TimeValue.timeValueSeconds(30)); logger.info("got all responses, executor service shutdown..."); executorService.shutdown(); - logger.info("pool is shut down"); + logger.info("executor service is shut down"); + client.stopBulk("test"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.stopBulk("test"); + logger.info("assuring {} == {}", maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); @@ -195,11 +196,12 @@ public class BulkNodeClientTest extends NodeTestBase { assertFalse(client.hasThrowable()); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) - .setQuery(QueryBuilders.matchAllQuery()).setSize(0); + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(0); assertEquals(maxthreads * maxloop, searchRequestBuilder.execute().actionGet().getHits().getTotalHits()); client.shutdown(); } } - } diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java index 205aa88..92ed78c 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -22,7 +21,6 @@ import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; -import org.xbib.elasticsearch.extras.client.node.BulkNodeClient; import java.io.IOException; import java.util.concurrent.CountDownLatch; @@ -152,7 +150,7 @@ public class BulkTransportClientTest extends NodeTestBase { } @Test - public void testBulkTransportClientRandomDocs() { + public void testBulkTransportClientRandomDocs() throws Exception { long numactions = NUM_ACTIONS; final BulkTransportClient client = ClientBuilder.builder() .put(getClientSettings()) @@ -168,31 +166,25 @@ public class BulkTransportClientTest extends NodeTestBase { } client.flushIngest(); client.waitForResponses(TimeValue.timeValueSeconds(30)); - } catch (InterruptedException e) { - // ignore - } catch (ExecutionException e) { - logger.error(e.getMessage(), e); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); - } catch (Throwable t) { - logger.error("unexcepted: " + t.getMessage(), t); } finally { - logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount()); - assertEquals(numactions, client.getMetric().getSucceeded().getCount()); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); } + logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getMetric().getSucceeded().getCount()); assertFalse(client.hasThrowable()); client.shutdown(); } } @Test - public void testBulkTransportClientThreadedRandomDocs() { + public void testBulkTransportClientThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); long maxactions = MAX_ACTIONS; final long maxloop = NUM_ACTIONS; - logger.info("firing up client"); + logger.info("TransportClient max={} maxactions={} maxloop={}", maxthreads, maxactions, maxloop); final BulkTransportClient client = ClientBuilder.builder() .put(getClientSettings()) .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions) @@ -201,23 +193,14 @@ public class BulkTransportClientTest extends NodeTestBase { .setControl(new SimpleBulkControl()) .toBulkTransportClient(); try { - logger.info("new index"); - Settings settingsForIndex = Settings.builder() - .put("index.number_of_shards", 2) - .put("index.number_of_replicas", 1) - .build(); - client.newIndex("test", settingsForIndex, null) - .startBulk("test", -1, 1000); - logger.info("pool"); + client.newIndex("test").startBulk("test", 30 * 1000, 1000); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { executorService.execute(() -> { - logger.info("executing runnable"); for (int i1 = 0; i1 < maxloop; i1++) { client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } - logger.info("done runnable"); latch.countDown(); }); } @@ -232,8 +215,6 @@ public class BulkTransportClientTest extends NodeTestBase { client.stopBulk("test"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); - } catch (Throwable t) { - logger.error("unexpected error: " + t.getMessage(), t); } finally { logger.info("assuring {} == {}", maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java index ec25b81..20a6e3c 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java @@ -43,6 +43,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; @@ -285,7 +286,7 @@ public abstract class AbstractClient { if (!mappings().isEmpty()) { for (Map.Entry me : mappings().entrySet()) { client().execute(PutMappingAction.INSTANCE, - new PutMappingRequest(index).type(me.getKey()).source(me.getValue())).actionGet(); + new PutMappingRequest(index).type(me.getKey()).source(me.getValue(), XContentType.JSON)).actionGet(); } } } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java index 56a72e5..593d9cc 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java @@ -1,12 +1,12 @@ package org.xbib.elasticsearch.extras.client; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -117,7 +117,7 @@ public class BulkProcessor implements Closeable { if (bulkRequest.numberOfActions() > 0) { execute(); } - return this.bulkRequestHandler.awaitClose(timeout, unit); + return bulkRequestHandler.awaitClose(timeout, unit); } /** @@ -127,8 +127,16 @@ public class BulkProcessor implements Closeable { * @param request request * @return his bulk processor */ - public BulkProcessor add(IndexRequest request) { - return add((ActionRequest) request); + public synchronized BulkProcessor add(IndexRequest request) { + if (request == null) { + return this; + } + ensureOpen(); + bulkRequest.add(request); + if (isOverTheLimit()) { + execute(); + } + return this; } /** @@ -137,64 +145,53 @@ public class BulkProcessor implements Closeable { * @param request request * @return his bulk processor */ - public BulkProcessor add(DeleteRequest request) { - return add((ActionRequest) request); - } - - /** - * Adds either a delete or an index request. - * - * @param request request - * @return his bulk processor - */ - public BulkProcessor add(ActionRequest request) { - return add(request, null); - } - - /** - * Adds either a delete or an index request with a payload. - * - * @param request request - * @param payload payload - * @return his bulk processor - */ - public BulkProcessor add(ActionRequest request, @Nullable Object payload) { - internalAdd(request, payload); + public synchronized BulkProcessor add(DeleteRequest request) { + if (request == null) { + return this; + } + ensureOpen(); + bulkRequest.add(request); + if (isOverTheLimit()) { + execute(); + } return this; } - protected void ensureOpen() { + /** + * Adds an {@link org.elasticsearch.action.update.UpdateRequest} to the list of actions to execute. + * + * @param request request + * @return his bulk processor + */ + public synchronized BulkProcessor add(UpdateRequest request) { + if (request == null) { + return this; + } + ensureOpen(); + bulkRequest.add(request); + if (isOverTheLimit()) { + execute(); + } + return this; + } + + private void ensureOpen() { if (closed) { throw new IllegalStateException("bulk process already closed"); } } - private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) { - ensureOpen(); - bulkRequest.add(request, payload); - executeIfNeeded(); - } - - private void executeIfNeeded() { - ensureOpen(); - if (!isOverTheLimit()) { - return; - } - execute(); + private boolean isOverTheLimit() { + final int count = bulkRequest.numberOfActions(); + return count > 0 && + (bulkActions != -1 && count >= bulkActions) || + (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize); } private void execute() { final BulkRequest myBulkRequest = this.bulkRequest; - final long executionId = executionIdGen.incrementAndGet(); + bulkRequestHandler.execute(myBulkRequest, executionIdGen.incrementAndGet()); this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler.execute(myBulkRequest, executionId); - } - - private boolean isOverTheLimit() { - return bulkActions != -1 && - bulkRequest.numberOfActions() >= bulkActions || - bulkSize != -1 && - bulkRequest.estimatedSizeInBytes() >= bulkSize; } /** @@ -347,17 +344,13 @@ public class BulkProcessor implements Closeable { if (closed) { return; } - if (bulkRequest.numberOfActions() == 0) { - return; + if (bulkRequest.numberOfActions() > 0) { + execute(); } - execute(); } } } - /** - * Abstracts the low-level details of bulk request handling. - */ interface BulkRequestHandler { void execute(BulkRequest bulkRequest, long executionId); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java index bac6bca..7f8edb0 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; @@ -241,7 +242,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { if (metric != null) { metric.getCurrentIngest().inc(index, type, id); } - bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source)); + bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source, XContentType.JSON)); } catch (Exception e) { throwable = e; closed = true; @@ -313,7 +314,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { if (metric != null) { metric.getCurrentIngest().inc(index, type, id); } - bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source)); + bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source, XContentType.JSON)); } catch (Exception e) { throwable = e; closed = true; @@ -446,7 +447,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { String type = entry.getKey(); String mapping = entry.getValue(); logger.info("found mapping for {}", type); - createIndexRequestBuilder.addMapping(type, mapping); + createIndexRequestBuilder.addMapping(type, mapping, XContentType.JSON); } } createIndexRequestBuilder.execute().actionGet(); diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java index 3d159ab..2a8193f 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.transport.Netty4Plugin; import org.xbib.elasticsearch.extras.client.AbstractClient; import org.xbib.elasticsearch.extras.client.BulkControl; @@ -293,7 +294,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods String type = entry.getKey(); String mapping = entry.getValue(); logger.info("found mapping for {}", type); - createIndexRequestBuilder.addMapping(type, mapping); + createIndexRequestBuilder.addMapping(type, mapping, XContentType.JSON); } } createIndexRequestBuilder.execute().actionGet(); @@ -348,8 +349,11 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods throwClose(); } try { - metric.getCurrentIngest().inc(index, type, id); - bulkProcessor.add(new IndexRequest().index(index).type(type).id(id).create(false).source(source)); + if (metric != null) { + metric.getCurrentIngest().inc(index, type, id); + } + bulkProcessor.add(new IndexRequest().index(index).type(type).id(id).create(false) + .source(source, XContentType.JSON)); } catch (Exception e) { throwable = e; closed = true; @@ -364,7 +368,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods throwClose(); } try { - metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); + if (metric != null) { + metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); + } bulkProcessor.add(indexRequest); } catch (Exception e) { throwable = e; @@ -380,7 +386,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods throwClose(); } try { - metric.getCurrentIngest().inc(index, type, id); + if (metric != null) { + metric.getCurrentIngest().inc(index, type, id); + } bulkProcessor.add(new DeleteRequest().index(index).type(type).id(id)); } catch (Exception e) { throwable = e; @@ -396,7 +404,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods throwClose(); } try { - metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); + if (metric != null) { + metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); + } bulkProcessor.add(deleteRequest); } catch (Exception e) { throwable = e; @@ -412,8 +422,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods throwClose(); } try { - metric.getCurrentIngest().inc(index, type, id); - bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source)); + if (metric != null) { + metric.getCurrentIngest().inc(index, type, id); + } + bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source, XContentType.JSON)); } catch (Exception e) { throwable = e; closed = true; @@ -428,7 +440,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods throwClose(); } try { - metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); + if (metric != null) { + metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); + } bulkProcessor.add(updateRequest); } catch (Exception e) { throwable = e; @@ -439,7 +453,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods } @Override - public synchronized BulkTransportClient flushIngest() { + public BulkTransportClient flushIngest() { if (closed) { throwClose(); } @@ -449,12 +463,13 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods } @Override - public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) - throws InterruptedException, ExecutionException { + public BulkTransportClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException { if (closed) { throwClose(); } - bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); + if (!bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS)) { + logger.warn("still waiting for responses"); + } return this; } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java index 86199d2..a25d012 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java @@ -1,5 +1,6 @@ package org.xbib.elasticsearch.extras.client.transport; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java index c639715..6707982 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/TransportClient.java @@ -1,7 +1,7 @@ package org.xbib.elasticsearch.extras.client.transport; -import static java.util.stream.Collectors.toList; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; +import static java.util.stream.Collectors.toList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -23,6 +23,7 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; @@ -39,8 +40,8 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.node.InternalSettingsPreparer; import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -56,6 +57,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.io.Closeable; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -78,6 +80,8 @@ import java.util.stream.Stream; */ public class TransportClient extends AbstractClient { + private static final Logger logger = LogManager.getLogger(TransportClient.class); + private static final String CLIENT_TYPE = "transport"; private final Injector injector; @@ -452,8 +456,11 @@ public class TransportClient extends AbstractClient { } modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool)); ActionModule actionModule = new ActionModule(true, settings, null, - settingsModule.getClusterSettings(), threadPool, - pluginsService.filterPlugins(ActionPlugin.class)); + settingsModule.getIndexScopedSettings(), + settingsModule.getClusterSettings(), + settingsModule.getSettingsFilter(), + threadPool, + pluginsService.filterPlugins(ActionPlugin.class), null, null); modules.add(actionModule); CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); @@ -464,10 +471,12 @@ public class TransportClient extends AbstractClient { NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, - xContentRegistry, networkService); + xContentRegistry, networkService, null); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = new TransportService(settings, transport, threadPool, - networkModule.getTransportInterceptor(), null); + networkModule.getTransportInterceptor(), boundTransportAddress -> + DiscoveryNode.createLocal(settings, dummyAddress(networkModule), UUIDs.randomBase64UUID()), + null); modules.add((b -> { b.bind(BigArrays.class).toInstance(bigArrays); b.bind(PluginsService.class).toInstance(pluginsService); @@ -495,7 +504,15 @@ public class TransportClient extends AbstractClient { } } - private static final Logger logger = LogManager.getLogger(TransportClient.class); + private static TransportAddress dummyAddress(NetworkModule networkModule) { + final TransportAddress address; + try { + address = networkModule.getTransportSupplier().get().addressesFromString("0.0.0.0:0", 1)[0]; + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + return address; + } private static PluginsService newPluginService(final Settings settings, Collection> plugins) { final Settings.Builder settingsBuilder = Settings.builder()