API simplifications for bulk

This commit is contained in:
Jörg Prante 2020-05-11 14:10:43 +02:00
parent a5c8dc8b5f
commit c03e47cf26
11 changed files with 277 additions and 284 deletions

View file

@ -14,6 +14,10 @@ public interface BulkController extends Closeable, Flushable {
void init(Settings settings);
void inactivate();
BulkMetric getBulkMetric();
Throwable getLastBulkError();
void startBulkMode(IndexDefinition indexDefinition) throws IOException;
@ -21,16 +25,15 @@ public interface BulkController extends Closeable, Flushable {
void startBulkMode(String indexName, long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException;
void index(IndexRequest indexRequest);
void bulkIndex(IndexRequest indexRequest);
void delete(DeleteRequest deleteRequest);
void bulkDelete(DeleteRequest deleteRequest);
void update(UpdateRequest updateRequest);
void bulkUpdate(UpdateRequest updateRequest);
boolean waitForResponses(long timeout, TimeUnit timeUnit);
boolean waitForBulkResponses(long timeout, TimeUnit timeUnit);
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;
}

View file

@ -0,0 +1,46 @@
package org.xbib.elx.api;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
/**
* A bulk listener for the execution.
*/
public interface BulkListener {
/**
* Callback before the bulk is executed.
*
* @param executionId execution ID
* @param request request
*/
void beforeBulk(long executionId, BulkRequest request);
/**
* Callback after a successful execution of bulk request.
*
* @param executionId execution ID
* @param request request
* @param response response
*/
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
/**
* Callback after a failed execution of bulk request.
*
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*
* @param executionId execution ID
* @param request request
* @param failure failure
*/
void afterBulk(long executionId, BulkRequest request, Throwable failure);
/**
* Get the last bulk error
* @return the lst bulk error
*/
Throwable getLastBulkError();
}

View file

@ -1,6 +1,5 @@
package org.xbib.elx.api;
import org.elasticsearch.common.settings.Settings;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@ -8,8 +7,6 @@ import java.io.Closeable;
public interface BulkMetric extends Closeable {
void init(Settings settings);
Metered getTotalIngest();
Count getTotalIngestSizeInBytes();

View file

@ -1,8 +1,6 @@
package org.xbib.elx.api;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import java.io.Closeable;
import java.io.Flushable;
@ -16,47 +14,5 @@ public interface BulkProcessor extends Closeable, Flushable {
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
}
/**
* A listener for the execution.
*/
interface Listener {
/**
* Callback before the bulk is executed.
*
* @param executionId execution ID
* @param request request
*/
void beforeBulk(long executionId, BulkRequest request);
/**
* Callback after a successful execution of bulk request.
*
* @param executionId execution ID
* @param request request
* @param response response
*/
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
/**
* Callback after a failed execution of bulk request.
*
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*
* @param executionId execution ID
* @param request request
* @param failure failure
*/
void afterBulk(long executionId, BulkRequest request, Throwable failure);
}
BulkListener getBulkListener();
}

View file

@ -0,0 +1,11 @@
package org.xbib.elx.api;
import org.elasticsearch.action.bulk.BulkRequest;
import java.util.concurrent.TimeUnit;
public interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
}

View file

@ -36,13 +36,7 @@ public interface ExtendedClient extends Flushable, Closeable {
ElasticsearchClient getClient();
/**
* Get bulk metric.
* @return the bulk metric
*/
BulkMetric getBulkMetric();
/**
* Get buulk control.
* Get bulk control.
* @return the bulk control
*/
BulkController getBulkController();

View file

@ -59,9 +59,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
@ -72,7 +70,6 @@ import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.IndexAliasAdder;
import org.xbib.elx.api.IndexDefinition;
@ -114,8 +111,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private ElasticsearchClient client;
private BulkMetric bulkMetric;
private BulkController bulkController;
private final AtomicBoolean closed;
@ -173,11 +168,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return client;
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public BulkController getBulkController() {
return bulkController;
@ -189,12 +179,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (client == null) {
client = createClient(settings);
}
if (bulkMetric == null) {
bulkMetric = new DefaultBulkMetric();
bulkMetric.init(settings);
}
if (bulkController == null) {
bulkController = new DefaultBulkController(this, bulkMetric);
bulkController = new DefaultBulkController(this);
bulkController.init(settings);
}
return this;
@ -209,12 +195,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void close() throws IOException {
ensureActive();
ensureClient();
if (closed.compareAndSet(false, true)) {
if (bulkMetric != null) {
logger.info("closing bulk metric");
bulkMetric.close();
}
if (bulkController != null) {
logger.info("closing bulk controller");
bulkController.close();
@ -225,7 +207,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getClusterName() {
ensureActive();
ensureClient();
try {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear();
ClusterStateResponse clusterStateResponse =
@ -245,7 +227,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException {
ensureActive();
ensureClient();
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
URL indexSettings = indexDefinition.getSettingsUrl();
if (indexSettings == null) {
@ -305,7 +287,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException {
ensureActive();
ensureClient();
if (index == null) {
logger.warn("no index name given to create index");
return this;
@ -326,7 +308,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
ensureActive();
ensureClient();
if (index == null) {
logger.warn("no index name given to create index");
return this;
@ -336,9 +318,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
createIndexRequest.settings(settings);
}
if (mapping != null) {
if (mapping.size() != 1) {
throw new IllegalArgumentException("mapping invalid, just use 'doc' for mapping");
}
createIndexRequest.mapping("_doc", mapping);
}
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
@ -355,7 +334,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient deleteIndex(String index) {
ensureActive();
ensureClient();
if (index == null) {
logger.warn("no index name given to delete index");
return this;
@ -375,7 +354,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
throws IOException {
if (bulkController != null) {
ensureActive();
ensureClient();
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
}
return this;
@ -384,7 +363,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException {
if (bulkController != null) {
ensureActive();
ensureClient();
bulkController.stopBulkMode(indexDefinition);
}
return this;
@ -393,7 +372,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException {
if (bulkController != null) {
ensureActive();
ensureClient();
bulkController.stopBulkMode(index, timeout, timeUnit);
}
return this;
@ -413,8 +392,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient index(IndexRequest indexRequest) {
ensureActive();
bulkController.index(indexRequest);
ensureClient();
bulkController.bulkIndex(indexRequest);
return this;
}
@ -425,13 +404,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient delete(DeleteRequest deleteRequest) {
ensureActive();
bulkController.delete(deleteRequest);
ensureClient();
bulkController.bulkDelete(deleteRequest);
return this;
}
@Override
public ExtendedClient update(String index, String id, BytesReference source) throws IOException {
public ExtendedClient update(String index, String id, BytesReference source) {
return update(new UpdateRequest().index(index).id(id)
.doc(source, XContentType.JSON));
}
@ -444,20 +423,20 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient update(UpdateRequest updateRequest) {
ensureActive();
bulkController.update(updateRequest);
ensureClient();
bulkController.bulkUpdate(updateRequest);
return this;
}
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
ensureActive();
return bulkController.waitForResponses(timeout, timeUnit);
ensureClient();
return bulkController.waitForBulkResponses(timeout, timeUnit);
}
@Override
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureClient();
ensureIndexGiven(index);
GetSettingsRequest settingsRequest = new GetSettingsRequest();
settingsRequest.indices(index);
@ -481,7 +460,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureClient();
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
@ -495,7 +474,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureClient();
try {
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
@ -552,7 +531,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient flushIndex(String index) {
if (index != null) {
ensureActive();
ensureClient();
client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet();
}
return this;
@ -561,7 +540,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient refreshIndex(String index) {
if (index != null) {
ensureActive();
ensureClient();
client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet();
}
return this;
@ -569,10 +548,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String resolveMostRecentIndex(String alias) {
ensureActive();
if (alias == null) {
return null;
}
ensureClient();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
@ -597,7 +576,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String resolveAlias(String alias) {
ensureActive();
ensureClient();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.blocks(false);
clusterStateRequest.metaData(true);
@ -639,7 +618,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public IndexShiftResult shiftIndex(String index, String fullIndexName,
List<String> additionalAliases, IndexAliasAdder adder) {
ensureActive();
ensureClient();
if (index == null) {
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
}
@ -727,7 +706,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (index.equals(fullIndexName)) {
return EMPTY_INDEX_PRUNE_RESULT;
}
ensureActive();
ensureClient();
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
@ -779,7 +758,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public Long mostRecentDocument(String index, String timestampfieldname) {
ensureActive();
ensureClient();
SortBuilder<?> sort = SortBuilders
.fieldSort(timestampfieldname)
.order(SortOrder.DESC);
@ -868,7 +847,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
ensureActive();
ensureClient();
if (index == null) {
throw new IOException("no index name given");
}
@ -885,7 +864,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
}
private void ensureActive() {
private void ensureClient() {
if (this instanceof MockExtendedClient) {
return;
}
@ -917,7 +896,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
public void checkMapping(String index) {
ensureActive();
ensureClient();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
@ -946,7 +925,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (total.value > 0L) {
Map<String, Long> fields = new TreeMap<>();
Map<String, Object> root = mappingMetaData.getSourceAsMap();
checkMapping(index, type, "", "", root, fields);
checkMapping(index, "", "", root, fields);
AtomicInteger empty = new AtomicInteger();
Map<String, Long> map = sortByValue(fields);
map.forEach((key, value) -> {
@ -965,7 +944,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@SuppressWarnings("unchecked")
private void checkMapping(String index, String type,
private void checkMapping(String index,
String pathDef, String fieldName, Map<String, Object> map,
Map<String, Long> fields) {
String path = pathDef;
@ -990,7 +969,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
String fieldType = o instanceof String ? o.toString() : null;
// do not recurse into our custom field mapper
if (!"standardnumber".equals(fieldType) && !"ref".equals(fieldType)) {
checkMapping(index, type, path, key, child, fields);
checkMapping(index, path, key, child, fields);
}
} else if ("type".equals(key)) {
QueryBuilder filterBuilder = QueryBuilders.existsQuery(path);

View file

@ -2,9 +2,6 @@ package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -12,6 +9,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.ExtendedClient;
@ -33,27 +31,23 @@ public class DefaultBulkController implements BulkController {
private final BulkMetric bulkMetric;
private BulkProcessor bulkProcessor;
private final List<String> indexNames;
private final Map<String, Long> startBulkRefreshIntervals;
private final Map<String, Long> stopBulkRefreshIntervals;
private long maxWaitTime;
private final long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private final TimeUnit maxWaitTimeUnit;
private BulkProcessor bulkProcessor;
private final AtomicBoolean active;
private BulkListener bulkListener;
private AtomicBoolean active;
private boolean enableBulkLogging;
public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) {
public DefaultBulkController(ExtendedClient client) {
this.client = client;
this.bulkMetric = bulkMetric;
this.bulkMetric = new DefaultBulkMetric();
this.indexNames = new ArrayList<>();
this.active = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
@ -62,9 +56,14 @@ public class DefaultBulkController implements BulkController {
this.maxWaitTimeUnit = TimeUnit.SECONDS;
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public Throwable getLastBulkError() {
return bulkListener.getLastBulkError();
return bulkProcessor.getBulkListener().getLastBulkError();
}
@Override
@ -78,22 +77,27 @@ public class DefaultBulkController implements BulkController {
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
"maxVolumePerRequest"));
this.enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
Parameters.ENABLE_BULK_LOGGING.getValue());
this.bulkListener = new BulkListener();
BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests)
.setFlushInterval(flushIngestInterval)
.setBulkSize(maxVolumePerRequest)
.build();
this.active.set(true);
if (logger.isInfoEnabled()) {
logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {} bulk logging = {} logger debug = {} from settings = {}",
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest,
enableBulkLogging, logger.isDebugEnabled(), settings.toDelimitedString(','));
}
this.active.set(true);
}
@Override
public void inactivate() {
this.active.set(false);
}
@Override
@ -118,65 +122,49 @@ public class DefaultBulkController implements BulkController {
}
@Override
public void index(IndexRequest indexRequest) {
public void bulkIndex(IndexRequest indexRequest) {
ensureActiveAndBulk();
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(indexRequest.index(), "_doc", indexRequest.id());
}
bulkMetric.getCurrentIngest().inc(indexRequest.index(), "_doc", indexRequest.id());
bulkProcessor.add(indexRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of index failed: " + e.getMessage(), e);
}
inactivate();
}
}
@Override
public void delete(DeleteRequest deleteRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
public void bulkDelete(DeleteRequest deleteRequest) {
ensureActiveAndBulk();
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), "_doc", deleteRequest.id());
}
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), "_doc", deleteRequest.id());
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of delete failed: " + e.getMessage(), e);
}
inactivate();
}
}
@Override
public void update(UpdateRequest updateRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
public void bulkUpdate(UpdateRequest updateRequest) {
ensureActiveAndBulk();
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(updateRequest.index(), "_doc", updateRequest.id());
}
bulkMetric.getCurrentIngest().inc(updateRequest.index(), "_doc", updateRequest.id());
bulkProcessor.add(updateRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of update failed: " + e.getMessage(), e);
}
inactivate();
}
}
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) {
try {
return bulkProcessor.awaitFlush(timeout, timeUnit);
} catch (InterruptedException e) {
@ -195,7 +183,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
flush();
if (waitForResponses(timeout, timeUnit)) {
if (waitForBulkResponses(timeout, timeUnit)) {
if (indexNames.contains(index)) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L) {
@ -217,6 +205,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void close() throws IOException {
flush();
bulkMetric.close();
if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
@ -238,92 +227,5 @@ public class DefaultBulkController implements BulkController {
if (bulkProcessor == null) {
throw new UnsupportedOperationException("bulk processor not present");
}
if (bulkListener == null) {
throw new UnsupportedOperationException("bulk listener not present");
}
}
private class BulkListener implements DefaultBulkProcessor.Listener {
private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
private Throwable lastBulkError = null;
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().inc();
int n = request.numberOfActions();
bulkMetric.getSubmitted().inc(n);
bulkMetric.getCurrentIngestNumDocs().inc(n);
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
}
if (enableBulkLogging && logger.isDebugEnabled()) {
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
executionId,
request.numberOfActions(),
request.estimatedSizeInBytes(),
l);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
}
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
}
if (itemResponse.isFailed()) {
n++;
if (bulkMetric != null) {
bulkMetric.getSucceeded().dec(1);
bulkMetric.getFailed().inc(1);
}
}
}
if (enableBulkLogging && logger.isDebugEnabled() && bulkMetric != null) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
executionId,
bulkMetric.getSucceeded().getCount(),
bulkMetric.getFailed().getCount(),
response.getTook().millis(),
l);
}
if (n > 0) {
if (enableBulkLogging && logger.isErrorEnabled()) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
}
} else {
if (bulkMetric != null) {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec();
}
lastBulkError = failure;
active.set(false);
if (enableBulkLogging && logger.isErrorEnabled()) {
logger.error("after bulk [" + executionId + "] error", failure);
}
}
Throwable getLastBulkError() {
return lastBulkError;
}
}
}

View file

@ -0,0 +1,95 @@
package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkMetric;
public class DefaultBulkListener implements BulkListener {
private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName());
private final BulkController bulkController;
private final BulkMetric bulkMetric;
private final boolean isBulkLoggingEnabled;
private Throwable lastBulkError;
public DefaultBulkListener(BulkController bulkController,
BulkMetric bulkMetric,
boolean isBulkLoggingEnabled) {
this.bulkController = bulkController;
this.bulkMetric = bulkMetric;
this.isBulkLoggingEnabled = isBulkLoggingEnabled;
}
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().inc();
int n = request.numberOfActions();
bulkMetric.getSubmitted().inc(n);
bulkMetric.getCurrentIngestNumDocs().inc(n);
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
executionId,
request.numberOfActions(),
request.estimatedSizeInBytes(),
l);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
if (itemResponse.isFailed()) {
n++;
bulkMetric.getSucceeded().dec(1);
bulkMetric.getFailed().inc(1);
}
}
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
executionId,
bulkMetric.getSucceeded().getCount(),
bulkMetric.getFailed().getCount(),
response.getTook().millis(),
l);
}
if (n > 0) {
if (isBulkLoggingEnabled && logger.isErrorEnabled()) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
}
} else {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
bulkMetric.getCurrentIngest().dec();
lastBulkError = failure;
if (logger.isErrorEnabled()) {
logger.error("after bulk [" + executionId + "] error", failure);
}
bulkController.inactivate();
}
@Override
public Throwable getLastBulkError() {
return lastBulkError;
}
}

View file

@ -1,6 +1,5 @@
package org.xbib.elx.common;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.BulkMetric;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@ -37,10 +36,6 @@ public class DefaultBulkMetric implements BulkMetric {
submitted = new CountMetric();
succeeded = new CountMetric();
failed = new CountMetric();
}
@Override
public void init(Settings settings) {
start();
}

View file

@ -15,7 +15,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.BulkRequestHandler;
import java.util.Objects;
import java.util.concurrent.Executors;
@ -33,6 +35,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class DefaultBulkProcessor implements BulkProcessor {
private final BulkListener bulkListener;
private final int bulkActions;
private final long bulkSize;
@ -49,16 +53,22 @@ public class DefaultBulkProcessor implements BulkProcessor {
private volatile boolean closed;
private DefaultBulkProcessor(ElasticsearchClient client, Listener listener, String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
private DefaultBulkProcessor(ElasticsearchClient client,
BulkListener bulkListener,
String name,
int concurrentRequests,
int bulkActions,
ByteSizeValue bulkSize,
TimeValue flushInterval) {
this.bulkListener = bulkListener;
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = concurrentRequests == 0 ?
new SyncBulkRequestHandler(client, listener) :
new AsyncBulkRequestHandler(client, listener, concurrentRequests);
new SyncBulkRequestHandler(client, bulkListener) :
new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests);
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
EsExecutors.daemonThreadFactory(Settings.EMPTY,
@ -73,10 +83,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
}
public static Builder builder(ElasticsearchClient client, Listener listener) {
public static Builder builder(ElasticsearchClient client,
BulkListener bulkListener) {
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null");
return new Builder(client, listener);
Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null");
return new Builder(client, bulkListener);
}
/**
@ -132,6 +143,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
return bulkRequestHandler.close(timeout, unit);
}
@Override
public BulkListener getBulkListener() {
return bulkListener;
}
/**
* Adds either a delete or an index request.
*
@ -216,7 +232,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
private final Listener listener;
private final BulkListener bulkListener;
private String name;
@ -233,11 +249,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
* to be notified on the completion of bulk requests.
*
* @param client the client
* @param listener the listener
* @param bulkListener the listener
*/
Builder(ElasticsearchClient client, Listener listener) {
Builder(ElasticsearchClient client, BulkListener bulkListener) {
this.client = client;
this.listener = listener;
this.bulkListener = bulkListener;
}
/**
@ -307,7 +323,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
* @return a bulk processor
*/
public DefaultBulkProcessor build() {
return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
}
}
@ -331,25 +347,24 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
private final DefaultBulkProcessor.Listener listener;
private final BulkListener bulkListener;
SyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener) {
Objects.requireNonNull(listener, "A listener is required for SyncBulkRequestHandler but null");
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) {
this.client = client;
this.listener = listener;
this.bulkListener = bulkListener;
}
@Override
public void execute(BulkRequest bulkRequest, long executionId) {
boolean afterCalled = false;
try {
listener.beforeBulk(executionId, bulkRequest);
bulkListener.beforeBulk(executionId, bulkRequest);
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
afterCalled = true;
listener.afterBulk(executionId, bulkRequest, bulkResponse);
bulkListener.afterBulk(executionId, bulkRequest, bulkResponse);
} catch (Exception e) {
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, e);
bulkListener.afterBulk(executionId, bulkRequest, e);
}
}
}
@ -364,16 +379,15 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
private final DefaultBulkProcessor.Listener listener;
private final BulkListener bulkListener;
private final Semaphore semaphore;
private final int concurrentRequests;
private AsyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener, int concurrentRequests) {
Objects.requireNonNull(listener, "A listener is required for AsyncBulkRequestHandler but null");
private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) {
this.client = client;
this.listener = listener;
this.bulkListener = bulkListener;
this.concurrentRequests = concurrentRequests;
this.semaphore = new Semaphore(concurrentRequests);
}
@ -383,14 +397,14 @@ public class DefaultBulkProcessor implements BulkProcessor {
boolean bulkRequestSetupSuccessful = false;
boolean acquired = false;
try {
listener.beforeBulk(executionId, bulkRequest);
bulkListener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
acquired = true;
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
bulkListener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
}
@ -399,7 +413,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
bulkListener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
}
@ -408,11 +422,12 @@ public class DefaultBulkProcessor implements BulkProcessor {
bulkRequestSetupSuccessful = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.afterBulk(executionId, bulkRequest, e);
bulkListener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
listener.afterBulk(executionId, bulkRequest, e);
bulkListener.afterBulk(executionId, bulkRequest, e);
} finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
if (!bulkRequestSetupSuccessful && acquired) {
// if we fail on client.bulk() release the semaphore
semaphore.release();
}
}