working on dynamic bulk requests
This commit is contained in:
parent
ed32b98383
commit
03024d489a
11 changed files with 178 additions and 172 deletions
|
@ -2,11 +2,12 @@ package org.xbib.elx.api;
|
||||||
|
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import java.io.Closeable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A bulk listener for following executions of bulk operations.
|
* A bulk listener for following executions of bulk operations.
|
||||||
*/
|
*/
|
||||||
public interface BulkListener {
|
public interface BulkListener extends Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback before the bulk is executed.
|
* Callback before the bulk is executed.
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package org.xbib.elx.api;
|
package org.xbib.elx.api;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.xbib.metrics.api.Count;
|
import org.xbib.metrics.api.Count;
|
||||||
import org.xbib.metrics.api.Metered;
|
import org.xbib.metrics.api.Metered;
|
||||||
|
|
||||||
|
@ -8,8 +7,6 @@ import java.io.Closeable;
|
||||||
|
|
||||||
public interface BulkMetric extends Closeable {
|
public interface BulkMetric extends Closeable {
|
||||||
|
|
||||||
void init(Settings settings);
|
|
||||||
|
|
||||||
void markTotalIngest(long n);
|
void markTotalIngest(long n);
|
||||||
|
|
||||||
Metered getTotalIngest();
|
Metered getTotalIngest();
|
||||||
|
|
|
@ -4,6 +4,7 @@ import org.elasticsearch.action.ActionRequest;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.Flushable;
|
import java.io.Flushable;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface BulkProcessor extends Closeable, Flushable {
|
public interface BulkProcessor extends Closeable, Flushable {
|
||||||
|
@ -16,9 +17,9 @@ public interface BulkProcessor extends Closeable, Flushable {
|
||||||
|
|
||||||
long getBulkSize();
|
long getBulkSize();
|
||||||
|
|
||||||
BulkProcessor add(ActionRequest<?> request);
|
BulkRequestHandler getBulkRequestHandler();
|
||||||
|
|
||||||
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
|
void add(ActionRequest<?> request);
|
||||||
|
|
||||||
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
|
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException, IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,18 @@
|
||||||
package org.xbib.elx.api;
|
package org.xbib.elx.api;
|
||||||
|
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface BulkRequestHandler {
|
public interface BulkRequestHandler {
|
||||||
|
|
||||||
void execute(BulkRequest bulkRequest, long executionId);
|
void execute(BulkRequest bulkRequest);
|
||||||
|
|
||||||
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
|
boolean flush(long timeout, TimeUnit unit) throws IOException, InterruptedException;
|
||||||
|
|
||||||
|
int getPermits();
|
||||||
|
|
||||||
|
void increase();
|
||||||
|
|
||||||
|
void reduce();
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.xbib.elx.api.BulkClient;
|
import org.xbib.elx.api.BulkClient;
|
||||||
import org.xbib.elx.api.BulkController;
|
import org.xbib.elx.api.BulkController;
|
||||||
import org.xbib.elx.api.BulkListener;
|
|
||||||
import org.xbib.elx.api.BulkMetric;
|
import org.xbib.elx.api.BulkMetric;
|
||||||
import org.xbib.elx.api.BulkProcessor;
|
import org.xbib.elx.api.BulkProcessor;
|
||||||
import org.xbib.elx.api.IndexDefinition;
|
import org.xbib.elx.api.IndexDefinition;
|
||||||
|
@ -25,11 +24,9 @@ public class DefaultBulkController implements BulkController {
|
||||||
|
|
||||||
private final BulkClient bulkClient;
|
private final BulkClient bulkClient;
|
||||||
|
|
||||||
private final BulkMetric bulkMetric;
|
|
||||||
|
|
||||||
private BulkProcessor bulkProcessor;
|
private BulkProcessor bulkProcessor;
|
||||||
|
|
||||||
private BulkListener bulkListener;
|
private DefaultBulkListener bulkListener;
|
||||||
|
|
||||||
private long maxWaitTime;
|
private long maxWaitTime;
|
||||||
|
|
||||||
|
@ -39,13 +36,11 @@ public class DefaultBulkController implements BulkController {
|
||||||
|
|
||||||
public DefaultBulkController(BulkClient bulkClient) {
|
public DefaultBulkController(BulkClient bulkClient) {
|
||||||
this.bulkClient = bulkClient;
|
this.bulkClient = bulkClient;
|
||||||
this.bulkMetric = new DefaultBulkMetric();
|
|
||||||
this.active = new AtomicBoolean(false);
|
this.active = new AtomicBoolean(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Settings settings) {
|
public void init(Settings settings) {
|
||||||
bulkMetric.init(settings);
|
|
||||||
String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(),
|
String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(),
|
||||||
Parameters.MAX_WAIT_BULK_RESPONSE.getString());
|
Parameters.MAX_WAIT_BULK_RESPONSE.getString());
|
||||||
TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr,
|
TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr,
|
||||||
|
@ -68,7 +63,7 @@ public class DefaultBulkController implements BulkController {
|
||||||
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
|
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
|
||||||
int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(),
|
int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(),
|
||||||
Parameters.RESPONSE_TIME_COUNT.getInteger());
|
Parameters.RESPONSE_TIME_COUNT.getInteger());
|
||||||
this.bulkListener = new DefaultBulkListener(this, bulkMetric,
|
this.bulkListener = new DefaultBulkListener(this,
|
||||||
enableBulkLogging, failOnBulkError, responseTimeCount);
|
enableBulkLogging, failOnBulkError, responseTimeCount);
|
||||||
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
|
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
|
||||||
.setBulkActions(maxActionsPerRequest)
|
.setBulkActions(maxActionsPerRequest)
|
||||||
|
@ -96,7 +91,7 @@ public class DefaultBulkController implements BulkController {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkMetric getBulkMetric() {
|
public BulkMetric getBulkMetric() {
|
||||||
return bulkMetric;
|
return bulkListener != null ? bulkListener.getBulkMetric() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -123,7 +118,6 @@ public class DefaultBulkController implements BulkController {
|
||||||
public void bulkIndex(IndexRequest indexRequest) {
|
public void bulkIndex(IndexRequest indexRequest) {
|
||||||
ensureActiveAndBulk();
|
ensureActiveAndBulk();
|
||||||
try {
|
try {
|
||||||
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
|
|
||||||
bulkProcessor.add(indexRequest);
|
bulkProcessor.add(indexRequest);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
|
@ -137,7 +131,6 @@ public class DefaultBulkController implements BulkController {
|
||||||
public void bulkDelete(DeleteRequest deleteRequest) {
|
public void bulkDelete(DeleteRequest deleteRequest) {
|
||||||
ensureActiveAndBulk();
|
ensureActiveAndBulk();
|
||||||
try {
|
try {
|
||||||
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
|
|
||||||
bulkProcessor.add(deleteRequest);
|
bulkProcessor.add(deleteRequest);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
|
@ -151,7 +144,6 @@ public class DefaultBulkController implements BulkController {
|
||||||
public void bulkUpdate(UpdateRequest updateRequest) {
|
public void bulkUpdate(UpdateRequest updateRequest) {
|
||||||
ensureActiveAndBulk();
|
ensureActiveAndBulk();
|
||||||
try {
|
try {
|
||||||
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
|
|
||||||
bulkProcessor.add(updateRequest);
|
bulkProcessor.add(updateRequest);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
|
@ -201,7 +193,6 @@ public class DefaultBulkController implements BulkController {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
bulkMetric.close();
|
|
||||||
flush();
|
flush();
|
||||||
bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit);
|
bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit);
|
||||||
if (bulkProcessor != null) {
|
if (bulkProcessor != null) {
|
||||||
|
|
|
@ -9,8 +9,10 @@ import org.xbib.elx.api.BulkController;
|
||||||
import org.xbib.elx.api.BulkListener;
|
import org.xbib.elx.api.BulkListener;
|
||||||
import org.xbib.elx.api.BulkMetric;
|
import org.xbib.elx.api.BulkMetric;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.LongSummaryStatistics;
|
import java.util.LongSummaryStatistics;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.LongStream;
|
import java.util.stream.LongStream;
|
||||||
|
|
||||||
public class DefaultBulkListener implements BulkListener {
|
public class DefaultBulkListener implements BulkListener {
|
||||||
|
@ -27,21 +29,25 @@ public class DefaultBulkListener implements BulkListener {
|
||||||
|
|
||||||
private Throwable lastBulkError;
|
private Throwable lastBulkError;
|
||||||
|
|
||||||
private final int responseTimeCount;
|
private final int ringBufferSize;
|
||||||
|
|
||||||
private final LastResponseTimes responseTimes;
|
private final LongRingBuffer ringBuffer;
|
||||||
|
|
||||||
public DefaultBulkListener(BulkController bulkController,
|
public DefaultBulkListener(BulkController bulkController,
|
||||||
BulkMetric bulkMetric,
|
|
||||||
boolean isBulkLoggingEnabled,
|
boolean isBulkLoggingEnabled,
|
||||||
boolean failOnError,
|
boolean failOnError,
|
||||||
int responseTimeCount) {
|
int ringBufferSize) {
|
||||||
this.bulkController = bulkController;
|
this.bulkController = bulkController;
|
||||||
this.bulkMetric = bulkMetric;
|
|
||||||
this.isBulkLoggingEnabled = isBulkLoggingEnabled;
|
this.isBulkLoggingEnabled = isBulkLoggingEnabled;
|
||||||
this.failOnError = failOnError;
|
this.failOnError = failOnError;
|
||||||
this.responseTimeCount = responseTimeCount;
|
this.ringBufferSize = ringBufferSize;
|
||||||
this.responseTimes = new LastResponseTimes(responseTimeCount);
|
this.ringBuffer = new LongRingBuffer(ringBufferSize);
|
||||||
|
this.bulkMetric = new DefaultBulkMetric();
|
||||||
|
bulkMetric.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public BulkMetric getBulkMetric() {
|
||||||
|
return bulkMetric;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -66,14 +72,17 @@ public class DefaultBulkListener implements BulkListener {
|
||||||
long l = bulkMetric.getCurrentIngest().getCount();
|
long l = bulkMetric.getCurrentIngest().getCount();
|
||||||
bulkMetric.getCurrentIngest().dec();
|
bulkMetric.getCurrentIngest().dec();
|
||||||
bulkMetric.getSucceeded().inc(response.getItems().length);
|
bulkMetric.getSucceeded().inc(response.getItems().length);
|
||||||
if (responseTimeCount > 0 && responseTimes.add(response.getTook().millis()) == 0) {
|
if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) {
|
||||||
LongSummaryStatistics stat = responseTimes.longStream().summaryStatistics();
|
LongSummaryStatistics stat1 = ringBuffer.longStreamValues1().summaryStatistics();
|
||||||
|
LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics();
|
||||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||||
logger.debug("bulk response millis: avg = " + stat.getAverage() +
|
logger.debug("bulk response millis: avg = " + stat1.getAverage() +
|
||||||
" min =" + stat.getMin() +
|
" min = " + stat1.getMin() +
|
||||||
" max = " + stat.getMax() +
|
" max = " + stat1.getMax() +
|
||||||
" actions = " + bulkController.getBulkProcessor().getBulkActions() +
|
" size: avg = " + stat2.getAverage() +
|
||||||
" size = " + bulkController.getBulkProcessor().getBulkSize());
|
" min = " + stat2.getMin() +
|
||||||
|
" max = " + stat2.getMax() +
|
||||||
|
" throughput: " + (stat2.getAverage() / stat1.getAverage()) + " bytes/ms");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int n = 0;
|
int n = 0;
|
||||||
|
@ -122,29 +131,41 @@ public class DefaultBulkListener implements BulkListener {
|
||||||
return lastBulkError;
|
return lastBulkError;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LastResponseTimes {
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
bulkMetric.close();
|
||||||
|
}
|
||||||
|
|
||||||
private final Long[] values;
|
private static class LongRingBuffer {
|
||||||
|
|
||||||
|
private final Long[] values1, values2;
|
||||||
|
|
||||||
private final int limit;
|
private final int limit;
|
||||||
|
|
||||||
private int index;
|
private final AtomicInteger index;
|
||||||
|
|
||||||
public LastResponseTimes(int limit) {
|
public LongRingBuffer(int limit) {
|
||||||
this.values = new Long[limit];
|
this.values1 = new Long[limit];
|
||||||
Arrays.fill(values, -1L);
|
this.values2 = new Long[limit];
|
||||||
|
Arrays.fill(values1, -1L);
|
||||||
|
Arrays.fill(values2, -1L);
|
||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
this.index = 0;
|
this.index = new AtomicInteger();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int add(Long value) {
|
public int add(Long v1, Long v2) {
|
||||||
int i = index++ % limit;
|
int i = index.incrementAndGet() % limit;
|
||||||
values[i] = value;
|
values1[i] = v1;
|
||||||
|
values2[i] = v2;
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
public LongStream longStream() {
|
public LongStream longStreamValues1() {
|
||||||
return Arrays.stream(values).filter(v -> v != -1L).mapToLong(Long::longValue);
|
return Arrays.stream(values1).filter(v -> v != -1L).mapToLong(Long::longValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public LongStream longStreamValues2() {
|
||||||
|
return Arrays.stream(values2).filter(v -> v != -1L).mapToLong(Long::longValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package org.xbib.elx.common;
|
package org.xbib.elx.common;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.xbib.elx.api.BulkMetric;
|
import org.xbib.elx.api.BulkMetric;
|
||||||
import org.xbib.metrics.api.Count;
|
import org.xbib.metrics.api.Count;
|
||||||
import org.xbib.metrics.api.Metered;
|
import org.xbib.metrics.api.Metered;
|
||||||
|
@ -39,12 +38,6 @@ public class DefaultBulkMetric implements BulkMetric {
|
||||||
failed = new CountMetric();
|
failed = new CountMetric();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void init(Settings settings) {
|
|
||||||
start();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void markTotalIngest(long n) {
|
public void markTotalIngest(long n) {
|
||||||
totalIngest.mark(n);
|
totalIngest.mark(n);
|
||||||
|
|
|
@ -10,11 +10,11 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
|
||||||
import org.xbib.elx.api.BulkListener;
|
import org.xbib.elx.api.BulkListener;
|
||||||
import org.xbib.elx.api.BulkProcessor;
|
import org.xbib.elx.api.BulkProcessor;
|
||||||
import org.xbib.elx.api.BulkRequestHandler;
|
import org.xbib.elx.api.BulkRequestHandler;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
@ -37,8 +37,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
private final ScheduledFuture<?> scheduledFuture;
|
private final ScheduledFuture<?> scheduledFuture;
|
||||||
|
|
||||||
private final AtomicLong executionIdGen;
|
|
||||||
|
|
||||||
private final BulkRequestHandler bulkRequestHandler;
|
private final BulkRequestHandler bulkRequestHandler;
|
||||||
|
|
||||||
private BulkRequest bulkRequest;
|
private BulkRequest bulkRequest;
|
||||||
|
@ -56,7 +54,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
int bulkActions,
|
int bulkActions,
|
||||||
ByteSizeValue bulkSize,
|
ByteSizeValue bulkSize,
|
||||||
TimeValue flushInterval) {
|
TimeValue flushInterval) {
|
||||||
this.executionIdGen = new AtomicLong();
|
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
this.bulkActions = bulkActions;
|
this.bulkActions = bulkActions;
|
||||||
this.bulkSize = bulkSize.getBytes();
|
this.bulkSize = bulkSize.getBytes();
|
||||||
|
@ -102,79 +99,20 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
return bulkSize;
|
return bulkSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public BulkRequestHandler getBulkRequestHandler() {
|
||||||
* Wait for bulk request handler with flush.
|
return bulkRequestHandler;
|
||||||
* @param timeout the timeout value
|
|
||||||
* @param unit the timeout unit
|
|
||||||
* @return true is method was successful, false if timeout
|
|
||||||
* @throws InterruptedException if timeout
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException {
|
|
||||||
Objects.requireNonNull(unit, "A time unit is required for awaitFlush() but null");
|
|
||||||
if (closed) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// flush
|
|
||||||
if (bulkRequest.numberOfActions() > 0) {
|
|
||||||
execute();
|
|
||||||
}
|
|
||||||
// wait for all bulk responses
|
|
||||||
return bulkRequestHandler.close(timeout, unit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Closes the processor. Any remaining bulk actions are flushed and then closed. This emthod can only be called
|
|
||||||
* once as the last action of a bulk processor.
|
|
||||||
*
|
|
||||||
* If concurrent requests are not enabled, returns {@code true} immediately.
|
|
||||||
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then
|
|
||||||
* returns {@code true},
|
|
||||||
* If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
|
|
||||||
*
|
|
||||||
* @param timeout The maximum time to wait for the bulk requests to complete
|
|
||||||
* @param unit The time unit of the {@code timeout} argument
|
|
||||||
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the
|
|
||||||
* bulk requests completed
|
|
||||||
* @throws InterruptedException If the current thread is interrupted
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
|
public synchronized void add(ActionRequest<?> request) {
|
||||||
Objects.requireNonNull(unit, "A time unit is required for awaitCLose() but null");
|
|
||||||
if (closed) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
closed = true;
|
|
||||||
if (scheduledFuture != null) {
|
|
||||||
FutureUtils.cancel(scheduledFuture);
|
|
||||||
scheduler.shutdown();
|
|
||||||
}
|
|
||||||
if (bulkRequest.numberOfActions() > 0) {
|
|
||||||
execute();
|
|
||||||
}
|
|
||||||
return bulkRequestHandler.close(timeout, unit);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds either a delete or an index request.
|
|
||||||
*
|
|
||||||
* @param request request
|
|
||||||
* @return his bulk processor
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public synchronized DefaultBulkProcessor add(ActionRequest<?> request) {
|
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
bulkRequest.add(request);
|
bulkRequest.add(request);
|
||||||
if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) ||
|
if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) ||
|
||||||
(bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) {
|
(bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) {
|
||||||
execute();
|
execute();
|
||||||
}
|
}
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush pending delete or index requests.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void flush() {
|
public synchronized void flush() {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
|
@ -183,14 +121,32 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException, IOException {
|
||||||
|
if (closed) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (bulkRequest.numberOfActions() > 0) {
|
||||||
|
execute();
|
||||||
|
}
|
||||||
|
return bulkRequestHandler.flush(timeout, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void close() throws IOException {
|
||||||
try {
|
try {
|
||||||
// 0 = immediate close
|
if (closed) {
|
||||||
awaitClose(0, TimeUnit.NANOSECONDS);
|
return;
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
if (scheduledFuture != null) {
|
||||||
|
scheduledFuture.cancel(true);
|
||||||
|
scheduler.shutdown();
|
||||||
|
}
|
||||||
|
if (bulkRequest.numberOfActions() > 0) {
|
||||||
|
execute();
|
||||||
|
}
|
||||||
|
bulkRequestHandler.flush(0, TimeUnit.NANOSECONDS);
|
||||||
} catch (InterruptedException exc) {
|
} catch (InterruptedException exc) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
@ -204,9 +160,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
private void execute() {
|
private void execute() {
|
||||||
BulkRequest myBulkRequest = this.bulkRequest;
|
BulkRequest myBulkRequest = this.bulkRequest;
|
||||||
long executionId = executionIdGen.incrementAndGet();
|
|
||||||
this.bulkRequest = new BulkRequest();
|
this.bulkRequest = new BulkRequest();
|
||||||
this.bulkRequestHandler.execute(myBulkRequest, executionId);
|
this.bulkRequestHandler.execute(myBulkRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -278,7 +233,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
|
* Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
|
||||||
* {@code 5mb}. Can be set to {@code -1} to disable it.
|
* {@code 1mb}. Can be set to {@code -1} to disable it.
|
||||||
*
|
*
|
||||||
* @param bulkSize bulk size
|
* @param bulkSize bulk size
|
||||||
* @return this builder
|
* @return this builder
|
||||||
|
@ -319,10 +274,9 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (bulkRequest.numberOfActions() == 0) {
|
if (bulkRequest.numberOfActions() > 0) {
|
||||||
return;
|
execute();
|
||||||
}
|
}
|
||||||
execute();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -333,14 +287,17 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
private final BulkListener bulkListener;
|
private final BulkListener bulkListener;
|
||||||
|
|
||||||
|
private final AtomicLong executionIdGen;
|
||||||
|
|
||||||
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) {
|
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) {
|
||||||
Objects.requireNonNull(bulkListener, "A listener is required for SyncBulkRequestHandler but null");
|
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.bulkListener = bulkListener;
|
this.bulkListener = bulkListener;
|
||||||
|
this.executionIdGen = new AtomicLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(BulkRequest bulkRequest, long executionId) {
|
public void execute(BulkRequest bulkRequest) {
|
||||||
|
long executionId = executionIdGen.incrementAndGet();
|
||||||
boolean afterCalled = false;
|
boolean afterCalled = false;
|
||||||
try {
|
try {
|
||||||
bulkListener.beforeBulk(executionId, bulkRequest);
|
bulkListener.beforeBulk(executionId, bulkRequest);
|
||||||
|
@ -355,9 +312,22 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean close(long timeout, TimeUnit unit) {
|
public boolean flush(long timeout, TimeUnit unit) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPermits() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void increase() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reduce() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class AsyncBulkRequestHandler implements BulkRequestHandler {
|
private static class AsyncBulkRequestHandler implements BulkRequestHandler {
|
||||||
|
@ -366,22 +336,27 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
private final BulkListener bulkListener;
|
private final BulkListener bulkListener;
|
||||||
|
|
||||||
private final Semaphore semaphore;
|
private final ResizeableSemaphore semaphore;
|
||||||
|
|
||||||
private final int concurrentRequests;
|
private final AtomicLong executionIdGen;
|
||||||
|
|
||||||
private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) {
|
private int permits;
|
||||||
Objects.requireNonNull(bulkListener, "A listener is required for AsyncBulkRequestHandler but null");
|
|
||||||
|
private AsyncBulkRequestHandler(ElasticsearchClient client,
|
||||||
|
BulkListener bulkListener,
|
||||||
|
int permits) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.bulkListener = bulkListener;
|
this.bulkListener = bulkListener;
|
||||||
this.concurrentRequests = concurrentRequests;
|
this.permits = permits;
|
||||||
this.semaphore = new Semaphore(concurrentRequests);
|
this.semaphore = new ResizeableSemaphore(permits);
|
||||||
|
this.executionIdGen = new AtomicLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(final BulkRequest bulkRequest, final long executionId) {
|
public void execute(BulkRequest bulkRequest) {
|
||||||
boolean bulkRequestSetupSuccessful = false;
|
boolean bulkRequestSetupSuccessful = false;
|
||||||
boolean acquired = false;
|
boolean acquired = false;
|
||||||
|
long executionId = executionIdGen.incrementAndGet();
|
||||||
try {
|
try {
|
||||||
bulkListener.beforeBulk(executionId, bulkRequest);
|
bulkListener.beforeBulk(executionId, bulkRequest);
|
||||||
semaphore.acquire();
|
semaphore.acquire();
|
||||||
|
@ -413,19 +388,49 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||||
} finally {
|
} finally {
|
||||||
if (!bulkRequestSetupSuccessful && acquired) {
|
if (!bulkRequestSetupSuccessful && acquired) {
|
||||||
// if we fail on client.bulk() release the semaphore
|
|
||||||
semaphore.release();
|
semaphore.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean close(long timeout, TimeUnit unit) throws InterruptedException {
|
public boolean flush(long timeout, TimeUnit unit) throws IOException, InterruptedException {
|
||||||
if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) {
|
bulkListener.close();
|
||||||
semaphore.release(concurrentRequests);
|
if (semaphore.tryAcquire(permits, timeout, unit)) {
|
||||||
|
semaphore.release(permits);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPermits() {
|
||||||
|
return permits;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void increase() {
|
||||||
|
semaphore.release(1);
|
||||||
|
this.permits++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reduce() {
|
||||||
|
semaphore.reducePermits(1);
|
||||||
|
this.permits--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
private static class ResizeableSemaphore extends Semaphore {
|
||||||
|
|
||||||
|
ResizeableSemaphore(int permits) {
|
||||||
|
super(permits, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void reducePermits(int reduction) {
|
||||||
|
super.reducePermits(reduction);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,13 +130,12 @@ class BulkClientTest {
|
||||||
@Test
|
@Test
|
||||||
void testThreadedRandomDocs() throws Exception {
|
void testThreadedRandomDocs() throws Exception {
|
||||||
int maxthreads = Runtime.getRuntime().availableProcessors();
|
int maxthreads = Runtime.getRuntime().availableProcessors();
|
||||||
Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
||||||
final long actions = ACTIONS;
|
final long actions = ACTIONS;
|
||||||
logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions);
|
long timeout = 120L;
|
||||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||||
.put(helper.getNodeSettings())
|
.put(helper.getNodeSettings())
|
||||||
.put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads)
|
|
||||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest)
|
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest)
|
||||||
.put(Parameters.FLUSH_INTERVAL.getName(), "60s")
|
.put(Parameters.FLUSH_INTERVAL.getName(), "60s")
|
||||||
.build()) {
|
.build()) {
|
||||||
|
@ -155,17 +154,12 @@ class BulkClientTest {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
logger.info("waiting for latch...");
|
if (latch.await(timeout, TimeUnit.SECONDS)) {
|
||||||
if (latch.await(30L, TimeUnit.SECONDS)) {
|
bulkClient.waitForResponses(timeout, TimeUnit.SECONDS);
|
||||||
logger.info("flush...");
|
|
||||||
bulkClient.flush();
|
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
|
||||||
logger.info("got all responses, executor service shutdown...");
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
executorService.awaitTermination(30L, TimeUnit.SECONDS);
|
executorService.awaitTermination(timeout, TimeUnit.SECONDS);
|
||||||
logger.info("pool is shut down");
|
|
||||||
} else {
|
} else {
|
||||||
logger.warn("latch timeout");
|
logger.error("latch timeout!");
|
||||||
}
|
}
|
||||||
bulkClient.stopBulk(indexDefinition);
|
bulkClient.stopBulk(indexDefinition);
|
||||||
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||||
|
|
|
@ -29,9 +29,9 @@ class BulkClientTest {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName());
|
private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName());
|
||||||
|
|
||||||
private static final Long ACTIONS = 10000L;
|
private static final Long ACTIONS = 100000L;
|
||||||
|
|
||||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10000L;
|
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
|
||||||
|
|
||||||
private final TestExtension.Helper helper;
|
private final TestExtension.Helper helper;
|
||||||
|
|
||||||
|
@ -128,13 +128,12 @@ class BulkClientTest {
|
||||||
@Test
|
@Test
|
||||||
void testThreadedRandomDocs() {
|
void testThreadedRandomDocs() {
|
||||||
int maxthreads = Runtime.getRuntime().availableProcessors();
|
int maxthreads = Runtime.getRuntime().availableProcessors();
|
||||||
Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
||||||
final long actions = ACTIONS;
|
final long actions = ACTIONS;
|
||||||
logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions);
|
long timeout = 120L;
|
||||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||||
.put(helper.getTransportSettings())
|
.put(helper.getTransportSettings())
|
||||||
.put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads * 2)
|
|
||||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest)
|
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest)
|
||||||
.put(Parameters.FLUSH_INTERVAL.getName(), "60s")
|
.put(Parameters.FLUSH_INTERVAL.getName(), "60s")
|
||||||
.put(Parameters.ENABLE_BULK_LOGGING.getName(), Boolean.TRUE)
|
.put(Parameters.ENABLE_BULK_LOGGING.getName(), Boolean.TRUE)
|
||||||
|
@ -157,17 +156,12 @@ class BulkClientTest {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
logger.info("waiting for latch...");
|
if (latch.await(timeout, TimeUnit.SECONDS)) {
|
||||||
if (latch.await(30L, TimeUnit.SECONDS)) {
|
bulkClient.waitForResponses(timeout, TimeUnit.SECONDS);
|
||||||
logger.info("flush...");
|
|
||||||
bulkClient.flush();
|
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
|
||||||
logger.info("got all responses, executor service shutdown...");
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
executorService.awaitTermination(30L, TimeUnit.SECONDS);
|
executorService.awaitTermination(timeout, TimeUnit.SECONDS);
|
||||||
logger.info("pool is shut down");
|
|
||||||
} else {
|
} else {
|
||||||
logger.warn("latch timeout");
|
logger.error("latch timeout!");
|
||||||
}
|
}
|
||||||
bulkClient.stopBulk(indexDefinition);
|
bulkClient.stopBulk(indexDefinition);
|
||||||
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||||
|
|
|
@ -13,6 +13,8 @@ test {
|
||||||
useJUnitPlatform()
|
useJUnitPlatform()
|
||||||
failFast = true
|
failFast = true
|
||||||
jvmArgs = [
|
jvmArgs = [
|
||||||
|
'-Xmx2g',
|
||||||
|
'-Xms2g',
|
||||||
'--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
|
'--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
|
||||||
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
|
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
|
||||||
'--add-opens=java.base/java.nio=ALL-UNNAMED'
|
'--add-opens=java.base/java.nio=ALL-UNNAMED'
|
||||||
|
|
Loading…
Reference in a new issue