|
|
|
@ -1,26 +1,29 @@
|
|
|
|
|
package org.xbib.elx.common;
|
|
|
|
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
|
|
import org.elasticsearch.action.ActionRequest;
|
|
|
|
|
import org.elasticsearch.action.bulk.BulkAction;
|
|
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
|
|
import org.elasticsearch.client.ElasticsearchClient;
|
|
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
|
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
|
|
import org.xbib.elx.api.BulkListener;
|
|
|
|
|
import org.xbib.elx.api.BulkClient;
|
|
|
|
|
import org.xbib.elx.api.BulkMetric;
|
|
|
|
|
import org.xbib.elx.api.BulkProcessor;
|
|
|
|
|
import org.xbib.elx.api.BulkRequestHandler;
|
|
|
|
|
import org.xbib.elx.api.IndexDefinition;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -29,343 +32,238 @@ import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
* (either based on number of actions, based on the size, or time), and
|
|
|
|
|
* to easily control the number of concurrent bulk
|
|
|
|
|
* requests allowed to be executed in parallel.
|
|
|
|
|
* In order to create a new bulk processor, use the {@link Builder}.
|
|
|
|
|
*/
|
|
|
|
|
public class DefaultBulkProcessor implements BulkProcessor {
|
|
|
|
|
|
|
|
|
|
private final ScheduledThreadPoolExecutor scheduler;
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class);
|
|
|
|
|
|
|
|
|
|
private final ScheduledFuture<?> scheduledFuture;
|
|
|
|
|
private final BulkClient bulkClient;
|
|
|
|
|
|
|
|
|
|
private final BulkRequestHandler bulkRequestHandler;
|
|
|
|
|
private final AtomicBoolean enabled;
|
|
|
|
|
|
|
|
|
|
private BulkRequest bulkRequest;
|
|
|
|
|
private final ElasticsearchClient client;
|
|
|
|
|
|
|
|
|
|
private long bulkSize;
|
|
|
|
|
private final DefaultBulkListener bulkListener;
|
|
|
|
|
|
|
|
|
|
private int bulkActions;
|
|
|
|
|
private final ScheduledThreadPoolExecutor scheduler;
|
|
|
|
|
|
|
|
|
|
private volatile boolean closed;
|
|
|
|
|
private ScheduledFuture<?> scheduledFuture;
|
|
|
|
|
|
|
|
|
|
private DefaultBulkProcessor(ElasticsearchClient client,
|
|
|
|
|
BulkListener bulkListener,
|
|
|
|
|
String name,
|
|
|
|
|
int concurrentRequests,
|
|
|
|
|
int bulkActions,
|
|
|
|
|
ByteSizeValue bulkSize,
|
|
|
|
|
TimeValue flushInterval) {
|
|
|
|
|
this.closed = false;
|
|
|
|
|
this.bulkActions = bulkActions;
|
|
|
|
|
this.bulkSize = bulkSize.getBytes();
|
|
|
|
|
this.bulkRequest = new BulkRequest();
|
|
|
|
|
this.bulkRequestHandler = concurrentRequests == 0 ?
|
|
|
|
|
new SyncBulkRequestHandler(client, bulkListener) :
|
|
|
|
|
new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests);
|
|
|
|
|
if (flushInterval != null) {
|
|
|
|
|
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
|
|
|
|
|
EsExecutors.daemonThreadFactory(name != null ? "[" + name + "]" : "" + "bulk_processor"));
|
|
|
|
|
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
|
|
|
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
|
|
|
|
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(),
|
|
|
|
|
private BulkRequest bulkRequest;
|
|
|
|
|
|
|
|
|
|
private long maxBulkVolume;
|
|
|
|
|
|
|
|
|
|
private int maxBulkActions;
|
|
|
|
|
|
|
|
|
|
private final AtomicBoolean closed;
|
|
|
|
|
|
|
|
|
|
private final AtomicLong executionIdGen;
|
|
|
|
|
|
|
|
|
|
private ResizeableSemaphore semaphore;
|
|
|
|
|
|
|
|
|
|
private int permits;
|
|
|
|
|
|
|
|
|
|
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
|
|
|
|
|
this.bulkClient = bulkClient;
|
|
|
|
|
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(),
|
|
|
|
|
Parameters.MAX_ACTIONS_PER_REQUEST.getInteger());
|
|
|
|
|
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(),
|
|
|
|
|
Parameters.MAX_CONCURRENT_REQUESTS.getInteger());
|
|
|
|
|
String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(),
|
|
|
|
|
Parameters.FLUSH_INTERVAL.getString());
|
|
|
|
|
TimeValue flushInterval = TimeValue.parseTimeValue(flushIngestIntervalStr,
|
|
|
|
|
TimeValue.timeValueSeconds(30), "");
|
|
|
|
|
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(),
|
|
|
|
|
ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
|
|
|
|
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(),
|
|
|
|
|
Parameters.ENABLE_BULK_LOGGING.getBoolean());
|
|
|
|
|
boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(),
|
|
|
|
|
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
|
|
|
|
|
int ringBufferSize = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(),
|
|
|
|
|
Parameters.RESPONSE_TIME_COUNT.getInteger());
|
|
|
|
|
this.client = bulkClient.getClient();
|
|
|
|
|
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2,
|
|
|
|
|
EsExecutors.daemonThreadFactory("elx-bulk-processor"));
|
|
|
|
|
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
|
|
|
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
|
|
|
|
if (flushInterval.millis() > 0L) {
|
|
|
|
|
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
|
|
|
|
flushInterval.millis(), TimeUnit.MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
this.bulkListener = new DefaultBulkListener(this, scheduler,
|
|
|
|
|
enableBulkLogging, failOnBulkError, ringBufferSize);
|
|
|
|
|
this.permits = maxConcurrentRequests;
|
|
|
|
|
this.maxBulkActions = maxActionsPerRequest;
|
|
|
|
|
this.maxBulkVolume = maxVolumePerRequest != null ? maxVolumePerRequest.getBytes() : -1;
|
|
|
|
|
this.bulkRequest = new BulkRequest();
|
|
|
|
|
this.closed = new AtomicBoolean(false);
|
|
|
|
|
this.enabled = new AtomicBoolean(false);
|
|
|
|
|
this.executionIdGen = new AtomicLong();
|
|
|
|
|
if (permits > 0) {
|
|
|
|
|
this.semaphore = new ResizeableSemaphore(permits);
|
|
|
|
|
}
|
|
|
|
|
if (logger.isInfoEnabled()) {
|
|
|
|
|
logger.info("bulk processor now active with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
|
|
|
|
|
"flushInterval = {} maxVolumePerRequest = {} " +
|
|
|
|
|
"bulk logging = {} fail on bulk error = {} " +
|
|
|
|
|
"logger debug = {} from settings = {}",
|
|
|
|
|
maxActionsPerRequest, maxConcurrentRequests,
|
|
|
|
|
flushInterval, maxVolumePerRequest,
|
|
|
|
|
enableBulkLogging, failOnBulkError,
|
|
|
|
|
logger.isDebugEnabled(), settings.toDelimitedString(','));
|
|
|
|
|
}
|
|
|
|
|
setEnabled(true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void setEnabled(boolean enabled) {
|
|
|
|
|
this.enabled.set(enabled);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
|
|
|
|
|
String indexName = indexDefinition.getFullIndexName();
|
|
|
|
|
int interval = indexDefinition.getStartBulkRefreshSeconds();
|
|
|
|
|
if (interval != 0) {
|
|
|
|
|
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
|
|
|
|
|
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
|
|
|
|
} else {
|
|
|
|
|
this.scheduler = null;
|
|
|
|
|
this.scheduledFuture = null;
|
|
|
|
|
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) {
|
|
|
|
|
Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null");
|
|
|
|
|
return new Builder(client, bulkListener);
|
|
|
|
|
@Override
|
|
|
|
|
public void stopBulkMode(IndexDefinition indexDefinition) throws IOException {
|
|
|
|
|
String indexName = indexDefinition.getFullIndexName();
|
|
|
|
|
int interval = indexDefinition.getStopBulkRefreshSeconds();
|
|
|
|
|
flush();
|
|
|
|
|
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
|
|
|
|
|
if (interval != 0) {
|
|
|
|
|
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
|
|
|
|
|
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
|
|
|
|
} else {
|
|
|
|
|
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void setBulkActions(int bulkActions) {
|
|
|
|
|
this.bulkActions = bulkActions;
|
|
|
|
|
public void setMaxBulkActions(int bulkActions) {
|
|
|
|
|
this.maxBulkActions = bulkActions;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int getBulkActions() {
|
|
|
|
|
return bulkActions;
|
|
|
|
|
public int getMaxBulkActions() {
|
|
|
|
|
return maxBulkActions;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void setBulkSize(long bulkSize) {
|
|
|
|
|
this.bulkSize = bulkSize;
|
|
|
|
|
public void setMaxBulkVolume(long bulkSize) {
|
|
|
|
|
this.maxBulkVolume = bulkSize;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long getBulkSize() {
|
|
|
|
|
return bulkSize;
|
|
|
|
|
public long getMaxBulkVolume() {
|
|
|
|
|
return maxBulkVolume;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public BulkRequestHandler getBulkRequestHandler() {
|
|
|
|
|
return bulkRequestHandler;
|
|
|
|
|
@Override
|
|
|
|
|
public BulkMetric getBulkMetric() {
|
|
|
|
|
return bulkListener.getBulkMetric();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public synchronized void add(ActionRequest<?> request) {
|
|
|
|
|
ensureOpen();
|
|
|
|
|
bulkRequest.add(request);
|
|
|
|
|
if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) ||
|
|
|
|
|
(bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) {
|
|
|
|
|
execute();
|
|
|
|
|
}
|
|
|
|
|
public Throwable getLastBulkError() {
|
|
|
|
|
return bulkListener.getLastBulkError();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public synchronized void flush() {
|
|
|
|
|
ensureOpen();
|
|
|
|
|
if (bulkRequest.numberOfActions() > 0) {
|
|
|
|
|
public synchronized void add(ActionRequest<?> request) {
|
|
|
|
|
ensureOpenAndActive();
|
|
|
|
|
bulkRequest.add(request);
|
|
|
|
|
if ((maxBulkActions != -1 && bulkRequest.numberOfActions() >= maxBulkActions) ||
|
|
|
|
|
(maxBulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= maxBulkVolume)) {
|
|
|
|
|
execute();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException, IOException {
|
|
|
|
|
if (closed) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
public synchronized void flush() {
|
|
|
|
|
ensureOpenAndActive();
|
|
|
|
|
if (bulkRequest.numberOfActions() > 0) {
|
|
|
|
|
execute();
|
|
|
|
|
}
|
|
|
|
|
return bulkRequestHandler.flush(timeout, unit);
|
|
|
|
|
// do not drain semaphore
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public synchronized void close() throws IOException {
|
|
|
|
|
public synchronized boolean waitForBulkResponses(long timeout, TimeUnit unit) {
|
|
|
|
|
try {
|
|
|
|
|
if (closed) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
closed = true;
|
|
|
|
|
if (scheduledFuture != null) {
|
|
|
|
|
scheduledFuture.cancel(true);
|
|
|
|
|
scheduler.shutdown();
|
|
|
|
|
if (closed.get()) {
|
|
|
|
|
// silently skip closed condition
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
if (bulkRequest.numberOfActions() > 0) {
|
|
|
|
|
execute();
|
|
|
|
|
}
|
|
|
|
|
bulkRequestHandler.flush(0, TimeUnit.NANOSECONDS);
|
|
|
|
|
} catch (InterruptedException exc) {
|
|
|
|
|
return drainSemaphore(timeout, unit);
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
logger.error("interrupted while waiting for bulk responses");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void ensureOpen() {
|
|
|
|
|
if (closed) {
|
|
|
|
|
throw new IllegalStateException("bulk processor already closed");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void execute() {
|
|
|
|
|
BulkRequest myBulkRequest = this.bulkRequest;
|
|
|
|
|
this.bulkRequest = new BulkRequest();
|
|
|
|
|
this.bulkRequestHandler.execute(myBulkRequest);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A builder used to create a build an instance of a bulk processor.
|
|
|
|
|
*/
|
|
|
|
|
public static class Builder {
|
|
|
|
|
|
|
|
|
|
private final ElasticsearchClient client;
|
|
|
|
|
|
|
|
|
|
private final BulkListener bulkListener;
|
|
|
|
|
|
|
|
|
|
private String name;
|
|
|
|
|
|
|
|
|
|
private int concurrentRequests = 1;
|
|
|
|
|
|
|
|
|
|
private int bulkActions = 1000;
|
|
|
|
|
|
|
|
|
|
private ByteSizeValue bulkSize = new ByteSizeValue(10, ByteSizeUnit.MB);
|
|
|
|
|
|
|
|
|
|
private TimeValue flushInterval = null;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates a builder of bulk processor with the client to use and the listener that will be used
|
|
|
|
|
* to be notified on the completion of bulk requests.
|
|
|
|
|
*
|
|
|
|
|
* @param client the client
|
|
|
|
|
* @param bulkListener the listener
|
|
|
|
|
*/
|
|
|
|
|
Builder(ElasticsearchClient client, BulkListener bulkListener) {
|
|
|
|
|
this.client = client;
|
|
|
|
|
this.bulkListener = bulkListener;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets an optional name to identify this bulk processor.
|
|
|
|
|
*
|
|
|
|
|
* @param name name
|
|
|
|
|
* @return this builder
|
|
|
|
|
*/
|
|
|
|
|
public Builder setName(String name) {
|
|
|
|
|
this.name = name;
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
|
|
|
|
|
* request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
|
|
|
|
|
* while accumulating new bulk requests. Defaults to {@code 1}.
|
|
|
|
|
*
|
|
|
|
|
* @param concurrentRequests maximum number of concurrent requests
|
|
|
|
|
* @return this builder
|
|
|
|
|
*/
|
|
|
|
|
public Builder setConcurrentRequests(int concurrentRequests) {
|
|
|
|
|
this.concurrentRequests = concurrentRequests;
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets when to flush a new bulk request based on the number of actions currently added. Defaults to
|
|
|
|
|
* {@code 1000}. Can be set to {@code -1} to disable it.
|
|
|
|
|
*
|
|
|
|
|
* @param bulkActions bulk actions
|
|
|
|
|
* @return this builder
|
|
|
|
|
*/
|
|
|
|
|
public Builder setBulkActions(int bulkActions) {
|
|
|
|
|
this.bulkActions = bulkActions;
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
|
|
|
|
|
* {@code 1mb}. Can be set to {@code -1} to disable it.
|
|
|
|
|
*
|
|
|
|
|
* @param bulkSize bulk size
|
|
|
|
|
* @return this builder
|
|
|
|
|
*/
|
|
|
|
|
public Builder setBulkSize(ByteSizeValue bulkSize) {
|
|
|
|
|
this.bulkSize = bulkSize;
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
|
|
|
|
|
* Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
|
|
|
|
|
* can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions.
|
|
|
|
|
*
|
|
|
|
|
* @param flushInterval flush interval
|
|
|
|
|
* @return this builder
|
|
|
|
|
*/
|
|
|
|
|
public Builder setFlushInterval(TimeValue flushInterval) {
|
|
|
|
|
this.flushInterval = flushInterval;
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Builds a new bulk processor.
|
|
|
|
|
*
|
|
|
|
|
* @return a bulk processor
|
|
|
|
|
*/
|
|
|
|
|
public DefaultBulkProcessor build() {
|
|
|
|
|
return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private class Flush implements Runnable {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
synchronized (DefaultBulkProcessor.this) {
|
|
|
|
|
if (closed) {
|
|
|
|
|
return;
|
|
|
|
|
@Override
|
|
|
|
|
public synchronized void close() throws IOException {
|
|
|
|
|
if (closed.compareAndSet(false, true)) {
|
|
|
|
|
try {
|
|
|
|
|
if (scheduledFuture != null) {
|
|
|
|
|
scheduledFuture.cancel(true);
|
|
|
|
|
}
|
|
|
|
|
if (scheduler != null) {
|
|
|
|
|
scheduler.shutdown();
|
|
|
|
|
}
|
|
|
|
|
// like flush but without ensuring open
|
|
|
|
|
if (bulkRequest.numberOfActions() > 0) {
|
|
|
|
|
execute();
|
|
|
|
|
}
|
|
|
|
|
drainSemaphore(0L, TimeUnit.NANOSECONDS);
|
|
|
|
|
bulkListener.close();
|
|
|
|
|
} catch (InterruptedException exc) {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class SyncBulkRequestHandler implements BulkRequestHandler {
|
|
|
|
|
|
|
|
|
|
private final ElasticsearchClient client;
|
|
|
|
|
|
|
|
|
|
private final BulkListener bulkListener;
|
|
|
|
|
|
|
|
|
|
private final AtomicLong executionIdGen;
|
|
|
|
|
|
|
|
|
|
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) {
|
|
|
|
|
this.client = client;
|
|
|
|
|
this.bulkListener = bulkListener;
|
|
|
|
|
this.executionIdGen = new AtomicLong();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void execute(BulkRequest bulkRequest) {
|
|
|
|
|
long executionId = executionIdGen.incrementAndGet();
|
|
|
|
|
private void execute() {
|
|
|
|
|
BulkRequest myBulkRequest = this.bulkRequest;
|
|
|
|
|
this.bulkRequest = new BulkRequest();
|
|
|
|
|
long executionId = executionIdGen.incrementAndGet();
|
|
|
|
|
if (semaphore == null) {
|
|
|
|
|
boolean afterCalled = false;
|
|
|
|
|
try {
|
|
|
|
|
bulkListener.beforeBulk(executionId, bulkRequest);
|
|
|
|
|
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
|
|
|
|
|
bulkListener.beforeBulk(executionId, myBulkRequest);
|
|
|
|
|
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, myBulkRequest).actionGet();
|
|
|
|
|
afterCalled = true;
|
|
|
|
|
bulkListener.afterBulk(executionId, bulkRequest, bulkResponse);
|
|
|
|
|
bulkListener.afterBulk(executionId, myBulkRequest, bulkResponse);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
if (!afterCalled) {
|
|
|
|
|
bulkListener.afterBulk(executionId, bulkRequest, e);
|
|
|
|
|
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean flush(long timeout, TimeUnit unit) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int getPermits() {
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void increase() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void reduce() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class AsyncBulkRequestHandler implements BulkRequestHandler {
|
|
|
|
|
|
|
|
|
|
private final ElasticsearchClient client;
|
|
|
|
|
|
|
|
|
|
private final BulkListener bulkListener;
|
|
|
|
|
|
|
|
|
|
private final ResizeableSemaphore semaphore;
|
|
|
|
|
|
|
|
|
|
private final AtomicLong executionIdGen;
|
|
|
|
|
|
|
|
|
|
private int permits;
|
|
|
|
|
|
|
|
|
|
private AsyncBulkRequestHandler(ElasticsearchClient client,
|
|
|
|
|
BulkListener bulkListener,
|
|
|
|
|
int permits) {
|
|
|
|
|
this.client = client;
|
|
|
|
|
this.bulkListener = bulkListener;
|
|
|
|
|
this.permits = permits;
|
|
|
|
|
this.semaphore = new ResizeableSemaphore(permits);
|
|
|
|
|
this.executionIdGen = new AtomicLong();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void execute(BulkRequest bulkRequest) {
|
|
|
|
|
} else {
|
|
|
|
|
boolean bulkRequestSetupSuccessful = false;
|
|
|
|
|
boolean acquired = false;
|
|
|
|
|
long executionId = executionIdGen.incrementAndGet();
|
|
|
|
|
try {
|
|
|
|
|
bulkListener.beforeBulk(executionId, bulkRequest);
|
|
|
|
|
bulkListener.beforeBulk(executionId, myBulkRequest);
|
|
|
|
|
semaphore.acquire();
|
|
|
|
|
acquired = true;
|
|
|
|
|
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
|
|
|
|
|
client.execute(BulkAction.INSTANCE, myBulkRequest, new ActionListener<>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void onResponse(BulkResponse response) {
|
|
|
|
|
try {
|
|
|
|
|
bulkListener.afterBulk(executionId, bulkRequest, response);
|
|
|
|
|
bulkListener.afterBulk(executionId, myBulkRequest, response);
|
|
|
|
|
} finally {
|
|
|
|
|
semaphore.release();
|
|
|
|
|
}
|
|
|
|
@ -374,7 +272,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|
|
|
|
@Override
|
|
|
|
|
public void onFailure(Throwable e) {
|
|
|
|
|
try {
|
|
|
|
|
bulkListener.afterBulk(executionId, bulkRequest, e);
|
|
|
|
|
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
|
|
|
|
} finally {
|
|
|
|
|
semaphore.release();
|
|
|
|
|
}
|
|
|
|
@ -383,45 +281,36 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|
|
|
|
bulkRequestSetupSuccessful = true;
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
bulkListener.afterBulk(executionId, bulkRequest, e);
|
|
|
|
|
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
bulkListener.afterBulk(executionId, bulkRequest, e);
|
|
|
|
|
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
|
|
|
|
} finally {
|
|
|
|
|
if (!bulkRequestSetupSuccessful && acquired) {
|
|
|
|
|
semaphore.release();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean flush(long timeout, TimeUnit unit) throws IOException, InterruptedException {
|
|
|
|
|
bulkListener.close();
|
|
|
|
|
if (semaphore.tryAcquire(permits, timeout, unit)) {
|
|
|
|
|
private boolean drainSemaphore(long timeValue, TimeUnit timeUnit) throws InterruptedException {
|
|
|
|
|
if (semaphore != null) {
|
|
|
|
|
if (semaphore.tryAcquire(permits, timeValue, timeUnit)) {
|
|
|
|
|
semaphore.release(permits);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int getPermits() {
|
|
|
|
|
return permits;
|
|
|
|
|
private void ensureOpenAndActive() {
|
|
|
|
|
if (closed.get()) {
|
|
|
|
|
throw new IllegalStateException("bulk processor is closed");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void increase() {
|
|
|
|
|
semaphore.release(1);
|
|
|
|
|
this.permits++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void reduce() {
|
|
|
|
|
semaphore.reducePermits(1);
|
|
|
|
|
this.permits--;
|
|
|
|
|
if (!enabled.get()) {
|
|
|
|
|
throw new IllegalStateException("bulk processor is no longer enabled");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("serial")
|
|
|
|
|
private static class ResizeableSemaphore extends Semaphore {
|
|
|
|
|
|
|
|
|
|
ResizeableSemaphore(int permits) {
|
|
|
|
|