|
|
|
@ -13,12 +13,15 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
|
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
|
|
|
|
import org.xbib.elx.api.BulkProcessor;
|
|
|
|
|
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
import java.util.logging.Level;
|
|
|
|
|
import java.util.logging.Logger;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
|
|
|
|
@ -28,6 +31,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
*/
|
|
|
|
|
public class DefaultBulkProcessor implements BulkProcessor {
|
|
|
|
|
|
|
|
|
|
private static final Logger logger = Logger.getLogger(DefaultBulkProcessor.class.getName());
|
|
|
|
|
|
|
|
|
|
private final int bulkActions;
|
|
|
|
|
|
|
|
|
|
private final long bulkSize;
|
|
|
|
@ -421,6 +426,10 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean close(long timeout, TimeUnit unit) throws InterruptedException {
|
|
|
|
|
logger.log(Level.INFO, "semaphore=" + semaphore +
|
|
|
|
|
" concurrentRequests=" + concurrentRequests +
|
|
|
|
|
" timeout=" + timeout +
|
|
|
|
|
" unit=" + unit);
|
|
|
|
|
if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) {
|
|
|
|
|
semaphore.release(concurrentRequests);
|
|
|
|
|
return true;
|
|
|
|
|