update to Elasticsearch 5.3.0

This commit is contained in:
Jörg Prante 2017-04-01 17:17:31 +02:00
parent f181871948
commit 25f529acd2
11 changed files with 128 additions and 116 deletions

View file

@ -1,10 +1,10 @@
group = org.xbib group = org.xbib
name = elasticsearch-extras-client name = elasticsearch-extras-client
version = 5.2.2.0 version = 5.3.0.0
elasticsearch-client-transport.version = 5.2.2 elasticsearch-client-transport.version = 5.3.0
xbib-metrics.version = 1.0.0 xbib-metrics.version = 1.0.0
netty-transport-native-epoll.version = 4.1.6.Final netty-transport-native-epoll.version = 4.1.7.Final
log4j.version = 2.8 log4j.version = 2.8
junit.version = 4.12 junit.version = 4.12
wagon.version = 2.10 wagon.version = 2.10

View file

@ -1,7 +1,6 @@
package org.elasticsearch.node; package org.elasticsearch.node;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList; import java.util.ArrayList;

View file

@ -9,6 +9,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestBase; import org.xbib.elasticsearch.NodeTestBase;
@ -45,7 +46,8 @@ public class ClusterBlockTest extends NodeTestBase {
BulkRequestBuilder brb = client("1").prepareBulk(); BulkRequestBuilder brb = client("1").prepareBulk();
XContentBuilder builder = jsonBuilder().startObject().field("field1", "value1").endObject(); XContentBuilder builder = jsonBuilder().startObject().field("field1", "value1").endObject();
String jsonString = builder.string(); String jsonString = builder.string();
IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1").setSource(jsonString); IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1")
.setSource(jsonString, XContentType.JSON);
brb.add(irb); brb.add(irb);
brb.execute().actionGet(); brb.execute().actionGet();
} }

View file

@ -142,10 +142,11 @@ public class BulkNodeClientTest extends NodeTestBase {
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
assertEquals(numactions, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
} }
logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount());
assertEquals(numactions, client.getMetric().getSucceeded().getCount());
assertFalse(client.hasThrowable()); assertFalse(client.hasThrowable());
client.shutdown(); client.shutdown();
} }
@ -164,8 +165,7 @@ public class BulkNodeClientTest extends NodeTestBase {
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
try { try {
client.newIndex("test") client.newIndex("test").startBulk("test", 30 * 1000, 1000);
.startBulk("test", 30 * 1000, 1000);
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
final CountDownLatch latch = new CountDownLatch(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads);
for (int i = 0; i < maxthreads; i++) { for (int i = 0; i < maxthreads; i++) {
@ -183,11 +183,12 @@ public class BulkNodeClientTest extends NodeTestBase {
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses(TimeValue.timeValueSeconds(30));
logger.info("got all responses, executor service shutdown..."); logger.info("got all responses, executor service shutdown...");
executorService.shutdown(); executorService.shutdown();
logger.info("pool is shut down"); logger.info("executor service is shut down");
client.stopBulk("test");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
client.stopBulk("test"); logger.info("assuring {} == {}", maxthreads * maxloop, client.getMetric().getSucceeded().getCount());
assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
@ -195,11 +196,12 @@ public class BulkNodeClientTest extends NodeTestBase {
assertFalse(client.hasThrowable()); assertFalse(client.hasThrowable());
client.refreshIndex("test"); client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0); .setIndices("test")
.setQuery(QueryBuilders.matchAllQuery())
.setSize(0);
assertEquals(maxthreads * maxloop, assertEquals(maxthreads * maxloop,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits()); searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
client.shutdown(); client.shutdown();
} }
} }
} }

View file

@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
@ -22,7 +21,6 @@ import org.xbib.elasticsearch.NodeTestBase;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.ClientBuilder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import org.xbib.elasticsearch.extras.client.node.BulkNodeClient;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -152,7 +150,7 @@ public class BulkTransportClientTest extends NodeTestBase {
} }
@Test @Test
public void testBulkTransportClientRandomDocs() { public void testBulkTransportClientRandomDocs() throws Exception {
long numactions = NUM_ACTIONS; long numactions = NUM_ACTIONS;
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = ClientBuilder.builder()
.put(getClientSettings()) .put(getClientSettings())
@ -168,31 +166,25 @@ public class BulkTransportClientTest extends NodeTestBase {
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses(TimeValue.timeValueSeconds(30));
} catch (InterruptedException e) {
// ignore
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} catch (Throwable t) {
logger.error("unexcepted: " + t.getMessage(), t);
} finally { } finally {
logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount());
assertEquals(numactions, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
} }
logger.info("assuring {} == {}", numactions, client.getMetric().getSucceeded().getCount());
assertEquals(numactions, client.getMetric().getSucceeded().getCount());
assertFalse(client.hasThrowable()); assertFalse(client.hasThrowable());
client.shutdown(); client.shutdown();
} }
} }
@Test @Test
public void testBulkTransportClientThreadedRandomDocs() { public void testBulkTransportClientThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors(); int maxthreads = Runtime.getRuntime().availableProcessors();
long maxactions = MAX_ACTIONS; long maxactions = MAX_ACTIONS;
final long maxloop = NUM_ACTIONS; final long maxloop = NUM_ACTIONS;
logger.info("firing up client"); logger.info("TransportClient max={} maxactions={} maxloop={}", maxthreads, maxactions, maxloop);
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = ClientBuilder.builder()
.put(getClientSettings()) .put(getClientSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions) .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions)
@ -201,23 +193,14 @@ public class BulkTransportClientTest extends NodeTestBase {
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkTransportClient(); .toBulkTransportClient();
try { try {
logger.info("new index"); client.newIndex("test").startBulk("test", 30 * 1000, 1000);
Settings settingsForIndex = Settings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build();
client.newIndex("test", settingsForIndex, null)
.startBulk("test", -1, 1000);
logger.info("pool");
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
final CountDownLatch latch = new CountDownLatch(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads);
for (int i = 0; i < maxthreads; i++) { for (int i = 0; i < maxthreads; i++) {
executorService.execute(() -> { executorService.execute(() -> {
logger.info("executing runnable");
for (int i1 = 0; i1 < maxloop; i1++) { for (int i1 = 0; i1 < maxloop; i1++) {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
logger.info("done runnable");
latch.countDown(); latch.countDown();
}); });
} }
@ -232,8 +215,6 @@ public class BulkTransportClientTest extends NodeTestBase {
client.stopBulk("test"); client.stopBulk("test");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} catch (Throwable t) {
logger.error("unexpected error: " + t.getMessage(), t);
} finally { } finally {
logger.info("assuring {} == {}", maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); logger.info("assuring {} == {}", maxthreads * maxloop, client.getMetric().getSucceeded().getCount());
assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount());

View file

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
@ -285,7 +286,7 @@ public abstract class AbstractClient {
if (!mappings().isEmpty()) { if (!mappings().isEmpty()) {
for (Map.Entry<String, String> me : mappings().entrySet()) { for (Map.Entry<String, String> me : mappings().entrySet()) {
client().execute(PutMappingAction.INSTANCE, client().execute(PutMappingAction.INSTANCE,
new PutMappingRequest(index).type(me.getKey()).source(me.getValue())).actionGet(); new PutMappingRequest(index).type(me.getKey()).source(me.getValue(), XContentType.JSON)).actionGet();
} }
} }
} }

View file

@ -1,12 +1,12 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elasticsearch.extras.client;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
@ -117,7 +117,7 @@ public class BulkProcessor implements Closeable {
if (bulkRequest.numberOfActions() > 0) { if (bulkRequest.numberOfActions() > 0) {
execute(); execute();
} }
return this.bulkRequestHandler.awaitClose(timeout, unit); return bulkRequestHandler.awaitClose(timeout, unit);
} }
/** /**
@ -127,8 +127,16 @@ public class BulkProcessor implements Closeable {
* @param request request * @param request request
* @return his bulk processor * @return his bulk processor
*/ */
public BulkProcessor add(IndexRequest request) { public synchronized BulkProcessor add(IndexRequest request) {
return add((ActionRequest) request); if (request == null) {
return this;
}
ensureOpen();
bulkRequest.add(request);
if (isOverTheLimit()) {
execute();
}
return this;
} }
/** /**
@ -137,64 +145,53 @@ public class BulkProcessor implements Closeable {
* @param request request * @param request request
* @return his bulk processor * @return his bulk processor
*/ */
public BulkProcessor add(DeleteRequest request) { public synchronized BulkProcessor add(DeleteRequest request) {
return add((ActionRequest) request); if (request == null) {
return this;
} }
ensureOpen();
/** bulkRequest.add(request);
* Adds either a delete or an index request. if (isOverTheLimit()) {
* execute();
* @param request request
* @return his bulk processor
*/
public BulkProcessor add(ActionRequest request) {
return add(request, null);
} }
/**
* Adds either a delete or an index request with a payload.
*
* @param request request
* @param payload payload
* @return his bulk processor
*/
public BulkProcessor add(ActionRequest request, @Nullable Object payload) {
internalAdd(request, payload);
return this; return this;
} }
protected void ensureOpen() { /**
* Adds an {@link org.elasticsearch.action.update.UpdateRequest} to the list of actions to execute.
*
* @param request request
* @return his bulk processor
*/
public synchronized BulkProcessor add(UpdateRequest request) {
if (request == null) {
return this;
}
ensureOpen();
bulkRequest.add(request);
if (isOverTheLimit()) {
execute();
}
return this;
}
private void ensureOpen() {
if (closed) { if (closed) {
throw new IllegalStateException("bulk process already closed"); throw new IllegalStateException("bulk process already closed");
} }
} }
private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) { private boolean isOverTheLimit() {
ensureOpen(); final int count = bulkRequest.numberOfActions();
bulkRequest.add(request, payload); return count > 0 &&
executeIfNeeded(); (bulkActions != -1 && count >= bulkActions) ||
} (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize);
private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
return;
}
execute();
} }
private void execute() { private void execute() {
final BulkRequest myBulkRequest = this.bulkRequest; final BulkRequest myBulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet(); bulkRequestHandler.execute(myBulkRequest, executionIdGen.incrementAndGet());
this.bulkRequest = new BulkRequest(); this.bulkRequest = new BulkRequest();
this.bulkRequestHandler.execute(myBulkRequest, executionId);
}
private boolean isOverTheLimit() {
return bulkActions != -1 &&
bulkRequest.numberOfActions() >= bulkActions ||
bulkSize != -1 &&
bulkRequest.estimatedSizeInBytes() >= bulkSize;
} }
/** /**
@ -347,17 +344,13 @@ public class BulkProcessor implements Closeable {
if (closed) { if (closed) {
return; return;
} }
if (bulkRequest.numberOfActions() == 0) { if (bulkRequest.numberOfActions() > 0) {
return;
}
execute(); execute();
} }
} }
} }
}
/**
* Abstracts the low-level details of bulk request handling.
*/
interface BulkRequestHandler { interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId); void execute(BulkRequest bulkRequest, long executionId);

View file

@ -21,6 +21,7 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.node.NodeValidationException;
@ -241,7 +242,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
if (metric != null) { if (metric != null) {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
} }
bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source)); bulkProcessor.add(new IndexRequest(index).type(type).id(id).create(false).source(source, XContentType.JSON));
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
closed = true; closed = true;
@ -313,7 +314,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
if (metric != null) { if (metric != null) {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
} }
bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source)); bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source, XContentType.JSON));
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
closed = true; closed = true;
@ -446,7 +447,7 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
String type = entry.getKey(); String type = entry.getKey();
String mapping = entry.getValue(); String mapping = entry.getValue();
logger.info("found mapping for {}", type); logger.info("found mapping for {}", type);
createIndexRequestBuilder.addMapping(type, mapping); createIndexRequestBuilder.addMapping(type, mapping, XContentType.JSON);
} }
} }
createIndexRequestBuilder.execute().actionGet(); createIndexRequestBuilder.execute().actionGet();

View file

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.Netty4Plugin; import org.elasticsearch.transport.Netty4Plugin;
import org.xbib.elasticsearch.extras.client.AbstractClient; import org.xbib.elasticsearch.extras.client.AbstractClient;
import org.xbib.elasticsearch.extras.client.BulkControl; import org.xbib.elasticsearch.extras.client.BulkControl;
@ -293,7 +294,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
String type = entry.getKey(); String type = entry.getKey();
String mapping = entry.getValue(); String mapping = entry.getValue();
logger.info("found mapping for {}", type); logger.info("found mapping for {}", type);
createIndexRequestBuilder.addMapping(type, mapping); createIndexRequestBuilder.addMapping(type, mapping, XContentType.JSON);
} }
} }
createIndexRequestBuilder.execute().actionGet(); createIndexRequestBuilder.execute().actionGet();
@ -348,8 +349,11 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
throwClose(); throwClose();
} }
try { try {
if (metric != null) {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
bulkProcessor.add(new IndexRequest().index(index).type(type).id(id).create(false).source(source)); }
bulkProcessor.add(new IndexRequest().index(index).type(type).id(id).create(false)
.source(source, XContentType.JSON));
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
closed = true; closed = true;
@ -364,7 +368,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
throwClose(); throwClose();
} }
try { try {
if (metric != null) {
metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
}
bulkProcessor.add(indexRequest); bulkProcessor.add(indexRequest);
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
@ -380,7 +386,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
throwClose(); throwClose();
} }
try { try {
if (metric != null) {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
}
bulkProcessor.add(new DeleteRequest().index(index).type(type).id(id)); bulkProcessor.add(new DeleteRequest().index(index).type(type).id(id));
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
@ -396,7 +404,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
throwClose(); throwClose();
} }
try { try {
if (metric != null) {
metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
}
bulkProcessor.add(deleteRequest); bulkProcessor.add(deleteRequest);
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
@ -412,8 +422,10 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
throwClose(); throwClose();
} }
try { try {
if (metric != null) {
metric.getCurrentIngest().inc(index, type, id); metric.getCurrentIngest().inc(index, type, id);
bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source)); }
bulkProcessor.add(new UpdateRequest().index(index).type(type).id(id).upsert(source, XContentType.JSON));
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
closed = true; closed = true;
@ -428,7 +440,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
throwClose(); throwClose();
} }
try { try {
if (metric != null) {
metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
}
bulkProcessor.add(updateRequest); bulkProcessor.add(updateRequest);
} catch (Exception e) { } catch (Exception e) {
throwable = e; throwable = e;
@ -439,7 +453,7 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
} }
@Override @Override
public synchronized BulkTransportClient flushIngest() { public BulkTransportClient flushIngest() {
if (closed) { if (closed) {
throwClose(); throwClose();
} }
@ -449,12 +463,13 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
} }
@Override @Override
public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) public BulkTransportClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException {
throws InterruptedException, ExecutionException {
if (closed) { if (closed) {
throwClose(); throwClose();
} }
bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); if (!bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS)) {
logger.warn("still waiting for responses");
}
return this; return this;
} }

View file

@ -1,5 +1,6 @@
package org.xbib.elasticsearch.extras.client.transport; package org.xbib.elasticsearch.extras.client.transport;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;

View file

@ -1,7 +1,7 @@
package org.xbib.elasticsearch.extras.client.transport; package org.xbib.elasticsearch.extras.client.transport;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static java.util.stream.Collectors.toList;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -23,6 +23,7 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injector;
@ -39,8 +40,8 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
@ -56,6 +57,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.Closeable; import java.io.Closeable;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -78,6 +80,8 @@ import java.util.stream.Stream;
*/ */
public class TransportClient extends AbstractClient { public class TransportClient extends AbstractClient {
private static final Logger logger = LogManager.getLogger(TransportClient.class);
private static final String CLIENT_TYPE = "transport"; private static final String CLIENT_TYPE = "transport";
private final Injector injector; private final Injector injector;
@ -452,8 +456,11 @@ public class TransportClient extends AbstractClient {
} }
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool)); modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(true, settings, null, ActionModule actionModule = new ActionModule(true, settings, null,
settingsModule.getClusterSettings(), threadPool, settingsModule.getIndexScopedSettings(),
pluginsService.filterPlugins(ActionPlugin.class)); settingsModule.getClusterSettings(),
settingsModule.getSettingsFilter(),
threadPool,
pluginsService.filterPlugins(ActionPlugin.class), null, null);
modules.add(actionModule); modules.add(actionModule);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(), CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings()); settingsModule.getClusterSettings());
@ -464,10 +471,12 @@ public class TransportClient extends AbstractClient {
NetworkModule networkModule = new NetworkModule(settings, true, NetworkModule networkModule = new NetworkModule(settings, true,
pluginsService.filterPlugins(NetworkPlugin.class), threadPool, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, circuitBreakerService, namedWriteableRegistry, bigArrays, circuitBreakerService, namedWriteableRegistry,
xContentRegistry, networkService); xContentRegistry, networkService, null);
final Transport transport = networkModule.getTransportSupplier().get(); final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool, final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), null); networkModule.getTransportInterceptor(), boundTransportAddress ->
DiscoveryNode.createLocal(settings, dummyAddress(networkModule), UUIDs.randomBase64UUID()),
null);
modules.add((b -> { modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays); b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService); b.bind(PluginsService.class).toInstance(pluginsService);
@ -495,7 +504,15 @@ public class TransportClient extends AbstractClient {
} }
} }
private static final Logger logger = LogManager.getLogger(TransportClient.class); private static TransportAddress dummyAddress(NetworkModule networkModule) {
final TransportAddress address;
try {
address = networkModule.getTransportSupplier().get().addressesFromString("0.0.0.0:0", 1)[0];
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
return address;
}
private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) { private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
final Settings.Builder settingsBuilder = Settings.builder() final Settings.Builder settingsBuilder = Settings.builder()