rewrite bulk over the limit condition
This commit is contained in:
parent
b9c8cb7b9a
commit
a9743dedf7
3 changed files with 11 additions and 26 deletions
|
@ -150,8 +150,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
* @return his bulk processor
|
* @return his bulk processor
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public DefaultBulkProcessor add(ActionRequest<?> request) {
|
public synchronized DefaultBulkProcessor add(ActionRequest<?> request) {
|
||||||
internalAdd(request);
|
ensureOpen();
|
||||||
|
bulkRequest.add(request);
|
||||||
|
if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) ||
|
||||||
|
(bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) {
|
||||||
|
execute();
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,33 +190,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void internalAdd(ActionRequest<?> request) {
|
|
||||||
ensureOpen();
|
|
||||||
bulkRequest.add(request);
|
|
||||||
executeIfNeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeIfNeeded() {
|
|
||||||
ensureOpen();
|
|
||||||
if (!isOverTheLimit()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void execute() {
|
private void execute() {
|
||||||
final BulkRequest myBulkRequest = this.bulkRequest;
|
BulkRequest myBulkRequest = this.bulkRequest;
|
||||||
final long executionId = executionIdGen.incrementAndGet();
|
long executionId = executionIdGen.incrementAndGet();
|
||||||
this.bulkRequest = new BulkRequest();
|
this.bulkRequest = new BulkRequest();
|
||||||
this.bulkRequestHandler.execute(myBulkRequest, executionId);
|
this.bulkRequestHandler.execute(myBulkRequest, executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isOverTheLimit() {
|
|
||||||
return bulkActions != -1 &&
|
|
||||||
bulkRequest.numberOfActions() >= bulkActions ||
|
|
||||||
bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A builder used to create a build an instance of a bulk processor.
|
* A builder used to create a build an instance of a bulk processor.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -32,7 +32,7 @@ class BulkClientTest {
|
||||||
|
|
||||||
private static final Long ACTIONS = 10000L;
|
private static final Long ACTIONS = 10000L;
|
||||||
|
|
||||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10000L;
|
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
|
||||||
|
|
||||||
private final TestExtension.Helper helper;
|
private final TestExtension.Helper helper;
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
group = org.xbib
|
group = org.xbib
|
||||||
name = elx
|
name = elx
|
||||||
version = 2.2.1.30
|
version = 2.2.1.31
|
||||||
|
|
||||||
gradle.wrapper.version = 6.6.1
|
gradle.wrapper.version = 6.6.1
|
||||||
xbib-metrics.version = 2.1.0
|
xbib-metrics.version = 2.1.0
|
||||||
|
|
Loading…
Reference in a new issue