diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java
index 69906ca..ec375eb 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java
@@ -14,6 +14,10 @@ public interface BulkController extends Closeable, Flushable {
void init(Settings settings);
+ void inactivate();
+
+ BulkMetric getBulkMetric();
+
Throwable getLastBulkError();
void startBulkMode(IndexDefinition indexDefinition) throws IOException;
@@ -21,16 +25,15 @@ public interface BulkController extends Closeable, Flushable {
void startBulkMode(String indexName, long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException;
- void index(IndexRequest indexRequest);
+ void bulkIndex(IndexRequest indexRequest);
- void delete(DeleteRequest deleteRequest);
+ void bulkDelete(DeleteRequest deleteRequest);
- void update(UpdateRequest updateRequest);
+ void bulkUpdate(UpdateRequest updateRequest);
- boolean waitForResponses(long timeout, TimeUnit timeUnit);
+ boolean waitForBulkResponses(long timeout, TimeUnit timeUnit);
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;
-
}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java
new file mode 100644
index 0000000..045204f
--- /dev/null
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java
@@ -0,0 +1,46 @@
+package org.xbib.elx.api;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+/**
+ * A bulk listener for the execution.
+ */
+public interface BulkListener {
+
+ /**
+ * Callback before the bulk is executed.
+ *
+ * @param executionId execution ID
+ * @param request request
+ */
+ void beforeBulk(long executionId, BulkRequest request);
+
+ /**
+ * Callback after a successful execution of bulk request.
+ *
+ * @param executionId execution ID
+ * @param request request
+ * @param response response
+ */
+ void afterBulk(long executionId, BulkRequest request, BulkResponse response);
+
+ /**
+ * Callback after a failed execution of bulk request.
+ *
+ * Note that in case an instance of InterruptedException
is passed, which means that request
+ * processing has been
+ * cancelled externally, the thread's interruption status has been restored prior to calling this method.
+ *
+ * @param executionId execution ID
+ * @param request request
+ * @param failure failure
+ */
+ void afterBulk(long executionId, BulkRequest request, Throwable failure);
+
+ /**
+ * Get the last bulk error
+ * @return the lst bulk error
+ */
+ Throwable getLastBulkError();
+}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java
index af825e5..7e84376 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java
@@ -1,6 +1,5 @@
package org.xbib.elx.api;
-import org.elasticsearch.common.settings.Settings;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@@ -8,8 +7,6 @@ import java.io.Closeable;
public interface BulkMetric extends Closeable {
- void init(Settings settings);
-
Metered getTotalIngest();
Count getTotalIngestSizeInBytes();
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java
index 2e16b03..d45170e 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java
@@ -1,8 +1,6 @@
package org.xbib.elx.api;
import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import java.io.Closeable;
import java.io.Flushable;
@@ -16,47 +14,5 @@ public interface BulkProcessor extends Closeable, Flushable {
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
- interface BulkRequestHandler {
-
- void execute(BulkRequest bulkRequest, long executionId);
-
- boolean close(long timeout, TimeUnit unit) throws InterruptedException;
-
- }
-
- /**
- * A listener for the execution.
- */
- interface Listener {
-
- /**
- * Callback before the bulk is executed.
- *
- * @param executionId execution ID
- * @param request request
- */
- void beforeBulk(long executionId, BulkRequest request);
-
- /**
- * Callback after a successful execution of bulk request.
- *
- * @param executionId execution ID
- * @param request request
- * @param response response
- */
- void afterBulk(long executionId, BulkRequest request, BulkResponse response);
-
- /**
- * Callback after a failed execution of bulk request.
- *
- * Note that in case an instance of InterruptedException
is passed, which means that request
- * processing has been
- * cancelled externally, the thread's interruption status has been restored prior to calling this method.
- *
- * @param executionId execution ID
- * @param request request
- * @param failure failure
- */
- void afterBulk(long executionId, BulkRequest request, Throwable failure);
- }
+ BulkListener getBulkListener();
}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java
new file mode 100644
index 0000000..1bc3886
--- /dev/null
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java
@@ -0,0 +1,11 @@
+package org.xbib.elx.api;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import java.util.concurrent.TimeUnit;
+
+public interface BulkRequestHandler {
+
+ void execute(BulkRequest bulkRequest, long executionId);
+
+ boolean close(long timeout, TimeUnit unit) throws InterruptedException;
+}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java
index 63fab80..e1e5ac2 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java
@@ -36,13 +36,7 @@ public interface ExtendedClient extends Flushable, Closeable {
ElasticsearchClient getClient();
/**
- * Get bulk metric.
- * @return the bulk metric
- */
- BulkMetric getBulkMetric();
-
- /**
- * Get buulk control.
+ * Get bulk control.
* @return the bulk control
*/
BulkController getBulkController();
diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java
index 4a337ca..a4a54d4 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java
@@ -59,9 +59,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
@@ -72,7 +70,6 @@ import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.xbib.elx.api.BulkController;
-import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.IndexAliasAdder;
import org.xbib.elx.api.IndexDefinition;
@@ -114,8 +111,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private ElasticsearchClient client;
- private BulkMetric bulkMetric;
-
private BulkController bulkController;
private final AtomicBoolean closed;
@@ -173,11 +168,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return client;
}
- @Override
- public BulkMetric getBulkMetric() {
- return bulkMetric;
- }
-
@Override
public BulkController getBulkController() {
return bulkController;
@@ -189,12 +179,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (client == null) {
client = createClient(settings);
}
- if (bulkMetric == null) {
- bulkMetric = new DefaultBulkMetric();
- bulkMetric.init(settings);
- }
if (bulkController == null) {
- bulkController = new DefaultBulkController(this, bulkMetric);
+ bulkController = new DefaultBulkController(this);
bulkController.init(settings);
}
return this;
@@ -209,12 +195,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void close() throws IOException {
- ensureActive();
+ ensureClient();
if (closed.compareAndSet(false, true)) {
- if (bulkMetric != null) {
- logger.info("closing bulk metric");
- bulkMetric.close();
- }
if (bulkController != null) {
logger.info("closing bulk controller");
bulkController.close();
@@ -225,7 +207,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getClusterName() {
- ensureActive();
+ ensureClient();
try {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear();
ClusterStateResponse clusterStateResponse =
@@ -245,7 +227,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException {
- ensureActive();
+ ensureClient();
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
URL indexSettings = indexDefinition.getSettingsUrl();
if (indexSettings == null) {
@@ -305,7 +287,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException {
- ensureActive();
+ ensureClient();
if (index == null) {
logger.warn("no index name given to create index");
return this;
@@ -326,7 +308,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException {
- ensureActive();
+ ensureClient();
if (index == null) {
logger.warn("no index name given to create index");
return this;
@@ -336,9 +318,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
createIndexRequest.settings(settings);
}
if (mapping != null) {
- if (mapping.size() != 1) {
- throw new IllegalArgumentException("mapping invalid, just use 'doc' for mapping");
- }
createIndexRequest.mapping("_doc", mapping);
}
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
@@ -355,7 +334,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient deleteIndex(String index) {
- ensureActive();
+ ensureClient();
if (index == null) {
logger.warn("no index name given to delete index");
return this;
@@ -375,7 +354,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
throws IOException {
if (bulkController != null) {
- ensureActive();
+ ensureClient();
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
}
return this;
@@ -384,7 +363,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException {
if (bulkController != null) {
- ensureActive();
+ ensureClient();
bulkController.stopBulkMode(indexDefinition);
}
return this;
@@ -393,7 +372,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException {
if (bulkController != null) {
- ensureActive();
+ ensureClient();
bulkController.stopBulkMode(index, timeout, timeUnit);
}
return this;
@@ -413,8 +392,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient index(IndexRequest indexRequest) {
- ensureActive();
- bulkController.index(indexRequest);
+ ensureClient();
+ bulkController.bulkIndex(indexRequest);
return this;
}
@@ -425,13 +404,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient delete(DeleteRequest deleteRequest) {
- ensureActive();
- bulkController.delete(deleteRequest);
+ ensureClient();
+ bulkController.bulkDelete(deleteRequest);
return this;
}
@Override
- public ExtendedClient update(String index, String id, BytesReference source) throws IOException {
+ public ExtendedClient update(String index, String id, BytesReference source) {
return update(new UpdateRequest().index(index).id(id)
.doc(source, XContentType.JSON));
}
@@ -444,20 +423,20 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient update(UpdateRequest updateRequest) {
- ensureActive();
- bulkController.update(updateRequest);
+ ensureClient();
+ bulkController.bulkUpdate(updateRequest);
return this;
}
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
- ensureActive();
- return bulkController.waitForResponses(timeout, timeUnit);
+ ensureClient();
+ return bulkController.waitForBulkResponses(timeout, timeUnit);
}
@Override
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
- ensureActive();
+ ensureClient();
ensureIndexGiven(index);
GetSettingsRequest settingsRequest = new GetSettingsRequest();
settingsRequest.indices(index);
@@ -481,7 +460,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
- ensureActive();
+ ensureClient();
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
@@ -495,7 +474,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
- ensureActive();
+ ensureClient();
try {
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
@@ -552,7 +531,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient flushIndex(String index) {
if (index != null) {
- ensureActive();
+ ensureClient();
client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet();
}
return this;
@@ -561,7 +540,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient refreshIndex(String index) {
if (index != null) {
- ensureActive();
+ ensureClient();
client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet();
}
return this;
@@ -569,10 +548,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String resolveMostRecentIndex(String alias) {
- ensureActive();
if (alias == null) {
return null;
}
+ ensureClient();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
@@ -597,7 +576,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String resolveAlias(String alias) {
- ensureActive();
+ ensureClient();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.blocks(false);
clusterStateRequest.metaData(true);
@@ -639,7 +618,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public IndexShiftResult shiftIndex(String index, String fullIndexName,
List additionalAliases, IndexAliasAdder adder) {
- ensureActive();
+ ensureClient();
if (index == null) {
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
}
@@ -727,7 +706,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (index.equals(fullIndexName)) {
return EMPTY_INDEX_PRUNE_RESULT;
}
- ensureActive();
+ ensureClient();
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
@@ -779,7 +758,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public Long mostRecentDocument(String index, String timestampfieldname) {
- ensureActive();
+ ensureClient();
SortBuilder> sort = SortBuilders
.fieldSort(timestampfieldname)
.order(SortOrder.DESC);
@@ -868,7 +847,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
- ensureActive();
+ ensureClient();
if (index == null) {
throw new IOException("no index name given");
}
@@ -885,7 +864,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
}
- private void ensureActive() {
+ private void ensureClient() {
if (this instanceof MockExtendedClient) {
return;
}
@@ -917,7 +896,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
public void checkMapping(String index) {
- ensureActive();
+ ensureClient();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap> map = getMappingsResponse.getMappings();
@@ -946,7 +925,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (total.value > 0L) {
Map fields = new TreeMap<>();
Map root = mappingMetaData.getSourceAsMap();
- checkMapping(index, type, "", "", root, fields);
+ checkMapping(index, "", "", root, fields);
AtomicInteger empty = new AtomicInteger();
Map map = sortByValue(fields);
map.forEach((key, value) -> {
@@ -965,7 +944,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@SuppressWarnings("unchecked")
- private void checkMapping(String index, String type,
+ private void checkMapping(String index,
String pathDef, String fieldName, Map map,
Map fields) {
String path = pathDef;
@@ -990,7 +969,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
String fieldType = o instanceof String ? o.toString() : null;
// do not recurse into our custom field mapper
if (!"standardnumber".equals(fieldType) && !"ref".equals(fieldType)) {
- checkMapping(index, type, path, key, child, fields);
+ checkMapping(index, path, key, child, fields);
}
} else if ("type".equals(key)) {
QueryBuilder filterBuilder = QueryBuilders.existsQuery(path);
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java
index 53970a3..968d678 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java
@@ -2,9 +2,6 @@ package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@@ -12,6 +9,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkController;
+import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.ExtendedClient;
@@ -33,27 +31,23 @@ public class DefaultBulkController implements BulkController {
private final BulkMetric bulkMetric;
+ private BulkProcessor bulkProcessor;
+
private final List indexNames;
private final Map startBulkRefreshIntervals;
private final Map stopBulkRefreshIntervals;
- private long maxWaitTime;
+ private final long maxWaitTime;
- private TimeUnit maxWaitTimeUnit;
+ private final TimeUnit maxWaitTimeUnit;
- private BulkProcessor bulkProcessor;
+ private final AtomicBoolean active;
- private BulkListener bulkListener;
-
- private AtomicBoolean active;
-
- private boolean enableBulkLogging;
-
- public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) {
+ public DefaultBulkController(ExtendedClient client) {
this.client = client;
- this.bulkMetric = bulkMetric;
+ this.bulkMetric = new DefaultBulkMetric();
this.indexNames = new ArrayList<>();
this.active = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
@@ -62,9 +56,14 @@ public class DefaultBulkController implements BulkController {
this.maxWaitTimeUnit = TimeUnit.SECONDS;
}
+ @Override
+ public BulkMetric getBulkMetric() {
+ return bulkMetric;
+ }
+
@Override
public Throwable getLastBulkError() {
- return bulkListener.getLastBulkError();
+ return bulkProcessor.getBulkListener().getLastBulkError();
}
@Override
@@ -78,22 +77,27 @@ public class DefaultBulkController implements BulkController {
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
"maxVolumePerRequest"));
- this.enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
+ boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
Parameters.ENABLE_BULK_LOGGING.getValue());
- this.bulkListener = new BulkListener();
+ BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests)
.setFlushInterval(flushIngestInterval)
.setBulkSize(maxVolumePerRequest)
.build();
- this.active.set(true);
if (logger.isInfoEnabled()) {
logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {} bulk logging = {} logger debug = {} from settings = {}",
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest,
enableBulkLogging, logger.isDebugEnabled(), settings.toDelimitedString(','));
}
+ this.active.set(true);
+ }
+
+ @Override
+ public void inactivate() {
+ this.active.set(false);
}
@Override
@@ -118,65 +122,49 @@ public class DefaultBulkController implements BulkController {
}
@Override
- public void index(IndexRequest indexRequest) {
+ public void bulkIndex(IndexRequest indexRequest) {
ensureActiveAndBulk();
- if (!active.get()) {
- throw new IllegalStateException("inactive");
- }
try {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().inc(indexRequest.index(), "_doc", indexRequest.id());
- }
+ bulkMetric.getCurrentIngest().inc(indexRequest.index(), "_doc", indexRequest.id());
bulkProcessor.add(indexRequest);
} catch (Exception e) {
- bulkListener.lastBulkError = e;
- active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of index failed: " + e.getMessage(), e);
}
+ inactivate();
}
}
@Override
- public void delete(DeleteRequest deleteRequest) {
- if (!active.get()) {
- throw new IllegalStateException("inactive");
- }
+ public void bulkDelete(DeleteRequest deleteRequest) {
+ ensureActiveAndBulk();
try {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().inc(deleteRequest.index(), "_doc", deleteRequest.id());
- }
+ bulkMetric.getCurrentIngest().inc(deleteRequest.index(), "_doc", deleteRequest.id());
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
- bulkListener.lastBulkError = e;
- active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of delete failed: " + e.getMessage(), e);
}
+ inactivate();
}
}
@Override
- public void update(UpdateRequest updateRequest) {
- if (!active.get()) {
- throw new IllegalStateException("inactive");
- }
+ public void bulkUpdate(UpdateRequest updateRequest) {
+ ensureActiveAndBulk();
try {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().inc(updateRequest.index(), "_doc", updateRequest.id());
- }
+ bulkMetric.getCurrentIngest().inc(updateRequest.index(), "_doc", updateRequest.id());
bulkProcessor.add(updateRequest);
} catch (Exception e) {
- bulkListener.lastBulkError = e;
- active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of update failed: " + e.getMessage(), e);
}
+ inactivate();
}
}
@Override
- public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
+ public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) {
try {
return bulkProcessor.awaitFlush(timeout, timeUnit);
} catch (InterruptedException e) {
@@ -195,7 +183,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
flush();
- if (waitForResponses(timeout, timeUnit)) {
+ if (waitForBulkResponses(timeout, timeUnit)) {
if (indexNames.contains(index)) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L) {
@@ -217,6 +205,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void close() throws IOException {
flush();
+ bulkMetric.close();
if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
@@ -238,92 +227,5 @@ public class DefaultBulkController implements BulkController {
if (bulkProcessor == null) {
throw new UnsupportedOperationException("bulk processor not present");
}
- if (bulkListener == null) {
- throw new UnsupportedOperationException("bulk listener not present");
- }
- }
-
- private class BulkListener implements DefaultBulkProcessor.Listener {
-
- private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
-
- private Throwable lastBulkError = null;
-
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- long l = 0;
- if (bulkMetric != null) {
- l = bulkMetric.getCurrentIngest().getCount();
- bulkMetric.getCurrentIngest().inc();
- int n = request.numberOfActions();
- bulkMetric.getSubmitted().inc(n);
- bulkMetric.getCurrentIngestNumDocs().inc(n);
- bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
- }
- if (enableBulkLogging && logger.isDebugEnabled()) {
- logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
- executionId,
- request.numberOfActions(),
- request.estimatedSizeInBytes(),
- l);
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- long l = 0;
- if (bulkMetric != null) {
- l = bulkMetric.getCurrentIngest().getCount();
- bulkMetric.getCurrentIngest().dec();
- bulkMetric.getSucceeded().inc(response.getItems().length);
- }
- int n = 0;
- for (BulkItemResponse itemResponse : response.getItems()) {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
- }
- if (itemResponse.isFailed()) {
- n++;
- if (bulkMetric != null) {
- bulkMetric.getSucceeded().dec(1);
- bulkMetric.getFailed().inc(1);
- }
- }
- }
- if (enableBulkLogging && logger.isDebugEnabled() && bulkMetric != null) {
- logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
- executionId,
- bulkMetric.getSucceeded().getCount(),
- bulkMetric.getFailed().getCount(),
- response.getTook().millis(),
- l);
- }
- if (n > 0) {
- if (enableBulkLogging && logger.isErrorEnabled()) {
- logger.error("bulk [{}] failed with {} failed items, failure message = {}",
- executionId, n, response.buildFailureMessage());
- }
- } else {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
- }
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().dec();
- }
- lastBulkError = failure;
- active.set(false);
- if (enableBulkLogging && logger.isErrorEnabled()) {
- logger.error("after bulk [" + executionId + "] error", failure);
- }
- }
-
- Throwable getLastBulkError() {
- return lastBulkError;
- }
}
}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java
new file mode 100644
index 0000000..c57da2d
--- /dev/null
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java
@@ -0,0 +1,95 @@
+package org.xbib.elx.common;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.xbib.elx.api.BulkController;
+import org.xbib.elx.api.BulkListener;
+import org.xbib.elx.api.BulkMetric;
+
+public class DefaultBulkListener implements BulkListener {
+
+ private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName());
+
+ private final BulkController bulkController;
+
+ private final BulkMetric bulkMetric;
+
+ private final boolean isBulkLoggingEnabled;
+
+ private Throwable lastBulkError;
+
+ public DefaultBulkListener(BulkController bulkController,
+ BulkMetric bulkMetric,
+ boolean isBulkLoggingEnabled) {
+ this.bulkController = bulkController;
+ this.bulkMetric = bulkMetric;
+ this.isBulkLoggingEnabled = isBulkLoggingEnabled;
+ }
+
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ long l = bulkMetric.getCurrentIngest().getCount();
+ bulkMetric.getCurrentIngest().inc();
+ int n = request.numberOfActions();
+ bulkMetric.getSubmitted().inc(n);
+ bulkMetric.getCurrentIngestNumDocs().inc(n);
+ bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
+ if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
+ logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
+ executionId,
+ request.numberOfActions(),
+ request.estimatedSizeInBytes(),
+ l);
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ long l = bulkMetric.getCurrentIngest().getCount();
+ bulkMetric.getCurrentIngest().dec();
+ bulkMetric.getSucceeded().inc(response.getItems().length);
+ int n = 0;
+ for (BulkItemResponse itemResponse : response.getItems()) {
+ bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
+ if (itemResponse.isFailed()) {
+ n++;
+ bulkMetric.getSucceeded().dec(1);
+ bulkMetric.getFailed().inc(1);
+ }
+ }
+ if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
+ logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
+ executionId,
+ bulkMetric.getSucceeded().getCount(),
+ bulkMetric.getFailed().getCount(),
+ response.getTook().millis(),
+ l);
+ }
+ if (n > 0) {
+ if (isBulkLoggingEnabled && logger.isErrorEnabled()) {
+ logger.error("bulk [{}] failed with {} failed items, failure message = {}",
+ executionId, n, response.buildFailureMessage());
+ }
+ } else {
+ bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ bulkMetric.getCurrentIngest().dec();
+ lastBulkError = failure;
+ if (logger.isErrorEnabled()) {
+ logger.error("after bulk [" + executionId + "] error", failure);
+ }
+ bulkController.inactivate();
+ }
+
+ @Override
+ public Throwable getLastBulkError() {
+ return lastBulkError;
+ }
+}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java
index 8127e29..1350e65 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java
@@ -1,6 +1,5 @@
package org.xbib.elx.common;
-import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.BulkMetric;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@@ -37,10 +36,6 @@ public class DefaultBulkMetric implements BulkMetric {
submitted = new CountMetric();
succeeded = new CountMetric();
failed = new CountMetric();
- }
-
- @Override
- public void init(Settings settings) {
start();
}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java
index 811f7de..852b1fb 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java
@@ -15,7 +15,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkProcessor;
+import org.xbib.elx.api.BulkRequestHandler;
import java.util.Objects;
import java.util.concurrent.Executors;
@@ -33,6 +35,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class DefaultBulkProcessor implements BulkProcessor {
+ private final BulkListener bulkListener;
+
private final int bulkActions;
private final long bulkSize;
@@ -49,16 +53,22 @@ public class DefaultBulkProcessor implements BulkProcessor {
private volatile boolean closed;
- private DefaultBulkProcessor(ElasticsearchClient client, Listener listener, String name, int concurrentRequests,
- int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
+ private DefaultBulkProcessor(ElasticsearchClient client,
+ BulkListener bulkListener,
+ String name,
+ int concurrentRequests,
+ int bulkActions,
+ ByteSizeValue bulkSize,
+ TimeValue flushInterval) {
+ this.bulkListener = bulkListener;
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = concurrentRequests == 0 ?
- new SyncBulkRequestHandler(client, listener) :
- new AsyncBulkRequestHandler(client, listener, concurrentRequests);
+ new SyncBulkRequestHandler(client, bulkListener) :
+ new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests);
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
EsExecutors.daemonThreadFactory(Settings.EMPTY,
@@ -73,10 +83,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
}
- public static Builder builder(ElasticsearchClient client, Listener listener) {
+ public static Builder builder(ElasticsearchClient client,
+ BulkListener bulkListener) {
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
- Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null");
- return new Builder(client, listener);
+ Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null");
+ return new Builder(client, bulkListener);
}
/**
@@ -132,6 +143,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
return bulkRequestHandler.close(timeout, unit);
}
+ @Override
+ public BulkListener getBulkListener() {
+ return bulkListener;
+ }
+
/**
* Adds either a delete or an index request.
*
@@ -216,7 +232,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
- private final Listener listener;
+ private final BulkListener bulkListener;
private String name;
@@ -233,11 +249,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
* to be notified on the completion of bulk requests.
*
* @param client the client
- * @param listener the listener
+ * @param bulkListener the listener
*/
- Builder(ElasticsearchClient client, Listener listener) {
+ Builder(ElasticsearchClient client, BulkListener bulkListener) {
this.client = client;
- this.listener = listener;
+ this.bulkListener = bulkListener;
}
/**
@@ -307,7 +323,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
* @return a bulk processor
*/
public DefaultBulkProcessor build() {
- return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
+ return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
}
}
@@ -331,25 +347,24 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
- private final DefaultBulkProcessor.Listener listener;
+ private final BulkListener bulkListener;
- SyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener) {
- Objects.requireNonNull(listener, "A listener is required for SyncBulkRequestHandler but null");
+ SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) {
this.client = client;
- this.listener = listener;
+ this.bulkListener = bulkListener;
}
@Override
public void execute(BulkRequest bulkRequest, long executionId) {
boolean afterCalled = false;
try {
- listener.beforeBulk(executionId, bulkRequest);
+ bulkListener.beforeBulk(executionId, bulkRequest);
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
afterCalled = true;
- listener.afterBulk(executionId, bulkRequest, bulkResponse);
+ bulkListener.afterBulk(executionId, bulkRequest, bulkResponse);
} catch (Exception e) {
if (!afterCalled) {
- listener.afterBulk(executionId, bulkRequest, e);
+ bulkListener.afterBulk(executionId, bulkRequest, e);
}
}
}
@@ -364,16 +379,15 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
- private final DefaultBulkProcessor.Listener listener;
+ private final BulkListener bulkListener;
private final Semaphore semaphore;
private final int concurrentRequests;
- private AsyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener, int concurrentRequests) {
- Objects.requireNonNull(listener, "A listener is required for AsyncBulkRequestHandler but null");
+ private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) {
this.client = client;
- this.listener = listener;
+ this.bulkListener = bulkListener;
this.concurrentRequests = concurrentRequests;
this.semaphore = new Semaphore(concurrentRequests);
}
@@ -383,14 +397,14 @@ public class DefaultBulkProcessor implements BulkProcessor {
boolean bulkRequestSetupSuccessful = false;
boolean acquired = false;
try {
- listener.beforeBulk(executionId, bulkRequest);
+ bulkListener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
acquired = true;
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
try {
- listener.afterBulk(executionId, bulkRequest, response);
+ bulkListener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
}
@@ -399,7 +413,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
@Override
public void onFailure(Exception e) {
try {
- listener.afterBulk(executionId, bulkRequest, e);
+ bulkListener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
}
@@ -408,11 +422,12 @@ public class DefaultBulkProcessor implements BulkProcessor {
bulkRequestSetupSuccessful = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- listener.afterBulk(executionId, bulkRequest, e);
+ bulkListener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
- listener.afterBulk(executionId, bulkRequest, e);
+ bulkListener.afterBulk(executionId, bulkRequest, e);
} finally {
- if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
+ if (!bulkRequestSetupSuccessful && acquired) {
+ // if we fail on client.bulk() release the semaphore
semaphore.release();
}
}