diff --git a/.travis.yml b/.travis.yml
index 0c75adf..94d2a22 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,12 +1,3 @@
language: java
-sudo: required
jdk:
- openjdk11
-cache:
- directories:
- - $HOME/.m2
-after_success:
- - ./gradlew sonarqube -Dsonar.host.url=https://sonarqube.com -Dsonar.login=$SONAR_TOKEN
-env:
- global:
- secure: n1Ai4q/yMLn/Pg5pA4lTavoJoe7mQYB1PSKnZAqwbgyla94ySzK6iyBCBiNs/foMPisB/x+DHvmUXTsjvquw9Ay48ZITCV3xhcWzD0eZM2TMoG19CpRAEe8L8LNuYiti9k89ijDdUGZ5ifsvQNTGNHksouayAuApC3PrTUejJfR6SYrp1ZsQTbsMlr+4XU3p7QknK5rGgOwATIMP28F+bVnB05WJtlJA3b0SeucCurn3wJ4FGBQXRYmdlT7bQhNE4QgZM1VzcUFD/K0TBxzzq/otb/lNRSifyoekktDmJwQnaT9uQ4R8R6KdQ2Kb38Rvgjur+TKm5i1G8qS2+6LnIxQJG1aw3JvKK6W0wWCgnAVVRrXaCLday9NuY59tuh1mfjQ10UcsMNKcTdcKEMrLow506wSETcXc7L/LEnneWQyJJeV4vhPqR7KJfsBbeqgz3yIfsCn1GZVWFlfegzYCN52YTl0Y0uRD2Z+TnzQu+Bf4DzaWXLge1rz31xkhyeNNspub4h024+XqBjcMm6M9mlMzmmK8t2DIwPy/BlQbFBUyhrxziuR/5/2NEDPyHltvWkRb4AUIa25WJqkV0gTBegbMadZ9DyOo6Ea7aoVFBae2WGR08F1kzABsWrd1S7UJmWxW35iyMEtoAIayXphIK98qO5aCutwZ+3iOQazxbAs=
diff --git a/build.gradle b/build.gradle
index 2f8e691..60e581e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2,7 +2,11 @@ plugins {
id "org.sonarqube" version "2.8"
id "io.codearte.nexus-staging" version "0.21.1"
id "com.github.spotbugs" version "2.0.1"
- id "org.xbib.gradle.plugin.asciidoctor" version "1.5.6.0.1"
+ id "org.xbib.gradle.plugin.asciidoctor" version "1.6.0.1"
+}
+
+if (JavaVersion.current() < JavaVersion.VERSION_11) {
+ throw new GradleException("This build must be run with Java/OpenJDK 11+")
}
subprojects {
@@ -19,10 +23,10 @@ subprojects {
dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-api:${project.property('junit.version')}"
- testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.property('junit.version')}"
testImplementation "org.apache.logging.log4j:log4j-core:${project.property('log4j.version')}"
testImplementation "org.apache.logging.log4j:log4j-jul:${project.property('log4j.version')}"
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:${project.property('log4j.version')}"
+ testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.property('junit.version')}"
asciidoclet "org.xbib:asciidoclet:${project.property('asciidoclet.version')}"
}
@@ -46,14 +50,11 @@ subprojects {
test {
enabled = true
useJUnitPlatform()
- // we MUST use this hack because of Elasticsearch 2.2.1 Lucene 5.4.1 MMapDirectory unmap() hackery
- doFirst {
- jvmArgs = [
- '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
- '--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
- '--add-opens=java.base/java.nio=ALL-UNNAMED'
- ]
- }
+ jvmArgs = [
+ '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
+ '--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
+ '--add-opens=java.base/java.nio=ALL-UNNAMED'
+ ]
systemProperty 'java.util.logging.manager', 'org.apache.logging.log4j.jul.LogManager'
systemProperty 'jna.debug_load', 'true'
systemProperty 'path.home', "${project.buildDir}"
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java
index 69906ca..ec375eb 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java
@@ -14,6 +14,10 @@ public interface BulkController extends Closeable, Flushable {
void init(Settings settings);
+ void inactivate();
+
+ BulkMetric getBulkMetric();
+
Throwable getLastBulkError();
void startBulkMode(IndexDefinition indexDefinition) throws IOException;
@@ -21,16 +25,15 @@ public interface BulkController extends Closeable, Flushable {
void startBulkMode(String indexName, long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException;
- void index(IndexRequest indexRequest);
+ void bulkIndex(IndexRequest indexRequest);
- void delete(DeleteRequest deleteRequest);
+ void bulkDelete(DeleteRequest deleteRequest);
- void update(UpdateRequest updateRequest);
+ void bulkUpdate(UpdateRequest updateRequest);
- boolean waitForResponses(long timeout, TimeUnit timeUnit);
+ boolean waitForBulkResponses(long timeout, TimeUnit timeUnit);
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;
-
}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java
new file mode 100644
index 0000000..9caa339
--- /dev/null
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java
@@ -0,0 +1,43 @@
+package org.xbib.elx.api;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+public interface BulkListener {
+
+ /**
+ * Callback before the bulk is executed.
+ *
+ * @param executionId execution ID
+ * @param request request
+ */
+ void beforeBulk(long executionId, BulkRequest request);
+
+ /**
+ * Callback after a successful execution of bulk request.
+ *
+ * @param executionId execution ID
+ * @param request request
+ * @param response response
+ */
+ void afterBulk(long executionId, BulkRequest request, BulkResponse response);
+
+ /**
+ * Callback after a failed execution of bulk request.
+ *
+ * Note that in case an instance of InterruptedException
is passed, which means that request
+ * processing has been
+ * cancelled externally, the thread's interruption status has been restored prior to calling this method.
+ *
+ * @param executionId execution ID
+ * @param request request
+ * @param failure failure
+ */
+ void afterBulk(long executionId, BulkRequest request, Throwable failure);
+
+ /**
+ * Get the last bulk error.
+ * @return the last bulk error
+ */
+ Throwable getLastBulkError();
+}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java
index af825e5..7e84376 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java
@@ -1,6 +1,5 @@
package org.xbib.elx.api;
-import org.elasticsearch.common.settings.Settings;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@@ -8,8 +7,6 @@ import java.io.Closeable;
public interface BulkMetric extends Closeable {
- void init(Settings settings);
-
Metered getTotalIngest();
Count getTotalIngestSizeInBytes();
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java
index cb994e0..73cc462 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java
@@ -1,8 +1,6 @@
package org.xbib.elx.api;
import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import java.io.Closeable;
import java.io.Flushable;
@@ -13,54 +11,9 @@ public interface BulkProcessor extends Closeable, Flushable {
@SuppressWarnings("rawtypes")
BulkProcessor add(ActionRequest request);
- @SuppressWarnings("rawtypes")
- BulkProcessor add(ActionRequest request, Object payload);
-
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
- interface BulkRequestHandler {
-
- void execute(BulkRequest bulkRequest, long executionId);
-
- boolean close(long timeout, TimeUnit unit) throws InterruptedException;
-
- }
-
- /**
- * A listener for the execution.
- */
- interface Listener {
-
- /**
- * Callback before the bulk is executed.
- *
- * @param executionId execution ID
- * @param request request
- */
- void beforeBulk(long executionId, BulkRequest request);
-
- /**
- * Callback after a successful execution of bulk request.
- *
- * @param executionId execution ID
- * @param request request
- * @param response response
- */
- void afterBulk(long executionId, BulkRequest request, BulkResponse response);
-
- /**
- * Callback after a failed execution of bulk request.
- *
- * Note that in case an instance of InterruptedException
is passed, which means that request
- * processing has been
- * cancelled externally, the thread's interruption status has been restored prior to calling this method.
- *
- * @param executionId execution ID
- * @param request request
- * @param failure failure
- */
- void afterBulk(long executionId, BulkRequest request, Throwable failure);
- }
+ BulkListener getBulkListener();
}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java
new file mode 100644
index 0000000..1bc3886
--- /dev/null
+++ b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java
@@ -0,0 +1,11 @@
+package org.xbib.elx.api;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import java.util.concurrent.TimeUnit;
+
+public interface BulkRequestHandler {
+
+ void execute(BulkRequest bulkRequest, long executionId);
+
+ boolean close(long timeout, TimeUnit unit) throws InterruptedException;
+}
diff --git a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java
index 0ec8639..6114621 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java
@@ -6,6 +6,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.Closeable;
import java.io.Flushable;
@@ -34,12 +35,6 @@ public interface ExtendedClient extends Flushable, Closeable {
*/
ElasticsearchClient getClient();
- /**
- * Get bulk metric.
- * @return the bulk metric
- */
- BulkMetric getBulkMetric();
-
/**
* Get buulk control.
* @return the bulk control
@@ -190,6 +185,8 @@ public interface ExtendedClient extends Flushable, Closeable {
*/
ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException;
+ ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException;
+
/**
* Create a new index.
*
@@ -199,7 +196,7 @@ public interface ExtendedClient extends Flushable, Closeable {
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
- ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException;
+ ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException;
/**
* Create a new index.
@@ -317,7 +314,7 @@ public interface ExtendedClient extends Flushable, Closeable {
/**
* Force segment merge of an index.
- * @param indexDefinition th eindex definition
+ * @param indexDefinition the index definition
* @return this
*/
boolean forceMerge(IndexDefinition indexDefinition);
@@ -398,7 +395,7 @@ public interface ExtendedClient extends Flushable, Closeable {
String resolveMostRecentIndex(String alias);
/**
- * Get all aliases.
+ * Get all index aliases.
* @param index the index
* @return map of index aliases
*/
diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java
index 0c118f8..a4ef207 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java
@@ -4,7 +4,7 @@ import java.util.List;
public interface IndexPruneResult {
- enum State { NOTHING_TO_DO, SUCCESS, NONE };
+ enum State { NOTHING_TO_DO, SUCCESS, NONE, FAIL };
State getState();
diff --git a/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java b/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java
index 6640686..bc0eb16 100644
--- a/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java
+++ b/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java
@@ -1,6 +1,5 @@
package org.xbib.elx.api;
-@FunctionalInterface
public interface ReadClientProvider {
C getReadClient();
diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java
index dea431c..8ccb489 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java
@@ -2,6 +2,7 @@ package org.xbib.elx.common;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
@@ -58,6 +59,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@@ -67,7 +69,6 @@ import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.xbib.elx.api.BulkController;
-import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.IndexAliasAdder;
import org.xbib.elx.api.IndexDefinition;
@@ -85,7 +86,6 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -108,22 +108,11 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private static final Logger logger = LogManager.getLogger(AbstractExtendedClient.class.getName());
- /**
- * The one and only index type name used in the extended client.
- * Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_".
- */
- private static final String TYPE_NAME = "doc";
-
- /**
- * The Elasticsearch client.
- */
private ElasticsearchClient client;
- private BulkMetric bulkMetric;
-
private BulkController bulkController;
- private AtomicBoolean closed;
+ private final AtomicBoolean closed;
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
@Override
@@ -178,11 +167,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return client;
}
- @Override
- public BulkMetric getBulkMetric() {
- return bulkMetric;
- }
-
@Override
public BulkController getBulkController() {
return bulkController;
@@ -190,15 +174,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public AbstractExtendedClient init(Settings settings) throws IOException {
+ logger.info("initializing with settings = " + settings.toDelimitedString(','));
if (client == null) {
client = createClient(settings);
}
- if (bulkMetric == null) {
- this.bulkMetric = new DefaultBulkMetric();
- this.bulkMetric.init(settings);
- }
if (bulkController == null) {
- this.bulkController = new DefaultBulkController(this, bulkMetric);
+ this.bulkController = new DefaultBulkController(this);
bulkController.init(settings);
}
return this;
@@ -213,12 +194,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void close() throws IOException {
- ensureActive();
+ ensureClient();
if (closed.compareAndSet(false, true)) {
- if (bulkMetric != null) {
- logger.info("closing bulk metric");
- bulkMetric.close();
- }
if (bulkController != null) {
logger.info("closing bulk controller");
bulkController.close();
@@ -229,9 +206,9 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getClusterName() {
- ensureActive();
+ ensureClient();
try {
- ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
+ ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear();
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value();
@@ -249,7 +226,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException {
- ensureActive();
+ ensureClient();
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
URL indexSettings = indexDefinition.getSettingsUrl();
if (indexSettings == null) {
@@ -284,7 +261,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index) throws IOException {
- return newIndex(index, Settings.EMPTY, (Map) null);
+ return newIndex(index, Settings.EMPTY, (Map) null);
}
@Override
@@ -296,7 +273,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index, Settings settings) throws IOException {
- return newIndex(index, settings, (Map) null);
+ return newIndex(index, settings, (Map) null);
}
@Override
@@ -306,8 +283,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
- public ExtendedClient newIndex(String index, Settings settings, Map mapping) {
- ensureActive();
+ public ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) {
+ ensureClient();
if (index == null) {
logger.warn("no index name given to create index");
return this;
@@ -317,11 +294,35 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
createIndexRequest.settings(settings);
}
if (mapping != null) {
- createIndexRequest.mapping(TYPE_NAME, mapping);
+ createIndexRequest.mapping("doc", mapping);
}
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
- logger.info("index {} created: {}", index, createIndexResponse);
- return this;
+ if (createIndexResponse.isAcknowledged()) {
+ return this;
+ }
+ throw new IllegalStateException("index creation not acknowledged: " + index);
+ }
+
+
+ @Override
+ public ExtendedClient newIndex(String index, Settings settings, Map mapping) {
+ ensureClient();
+ if (index == null) {
+ logger.warn("no index name given to create index");
+ return this;
+ }
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
+ if (settings != null) {
+ createIndexRequest.settings(settings);
+ }
+ if (mapping != null) {
+ createIndexRequest.mapping("doc", mapping);
+ }
+ CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
+ if (createIndexResponse.isAcknowledged()) {
+ return this;
+ }
+ throw new IllegalStateException("index creation not acknowledged: " + index);
}
@Override
@@ -331,7 +332,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient deleteIndex(String index) {
- ensureActive();
+ ensureClient();
if (index == null) {
logger.warn("no index name given to delete index");
return this;
@@ -351,7 +352,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
throws IOException {
if (bulkController != null) {
- ensureActive();
+ ensureClient();
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
}
return this;
@@ -360,7 +361,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException {
if (bulkController != null) {
- ensureActive();
+ ensureClient();
bulkController.stopBulkMode(indexDefinition);
}
return this;
@@ -369,7 +370,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException {
if (bulkController != null) {
- ensureActive();
+ ensureClient();
bulkController.stopBulkMode(index, timeout, timeUnit);
}
return this;
@@ -377,63 +378,63 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient index(String index, String id, boolean create, String source) {
- return index(new IndexRequest(index, TYPE_NAME, id).create(create)
+ return index(new IndexRequest().index(index).type("doc").id(id).create(create)
.source(source.getBytes(StandardCharsets.UTF_8)));
}
@Override
public ExtendedClient index(String index, String id, boolean create, BytesReference source) {
- return index(new IndexRequest(index, TYPE_NAME, id).create(create)
+ return index(new IndexRequest().index(index).type("doc").id(id).create(create)
.source(source));
}
@Override
public ExtendedClient index(IndexRequest indexRequest) {
- ensureActive();
- bulkController.index(indexRequest);
+ ensureClient();
+ bulkController.bulkIndex(indexRequest);
return this;
}
@Override
public ExtendedClient delete(String index, String id) {
- return delete(new DeleteRequest(index, TYPE_NAME, id));
+ return delete(new DeleteRequest().index(index).type("doc").id(id));
}
@Override
public ExtendedClient delete(DeleteRequest deleteRequest) {
- ensureActive();
- bulkController.delete(deleteRequest);
+ ensureClient();
+ bulkController.bulkDelete(deleteRequest);
return this;
}
@Override
public ExtendedClient update(String index, String id, BytesReference source) {
- return update(new UpdateRequest(index, TYPE_NAME, id)
+ return update(new UpdateRequest().index(index).type("doc").id(id)
.doc(source));
}
@Override
public ExtendedClient update(String index, String id, String source) {
- return update(new UpdateRequest(index, TYPE_NAME, id)
+ return update(new UpdateRequest().index(index).type("doc").id(id)
.doc(source.getBytes(StandardCharsets.UTF_8)));
}
@Override
public ExtendedClient update(UpdateRequest updateRequest) {
- ensureActive();
- bulkController.update(updateRequest);
+ ensureClient();
+ bulkController.bulkUpdate(updateRequest);
return this;
}
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
- ensureActive();
- return bulkController.waitForResponses(timeout, timeUnit);
+ ensureClient();
+ return bulkController.waitForBulkResponses(timeout, timeUnit);
}
@Override
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
- ensureActive();
+ ensureClient();
ensureIndexGiven(index);
GetSettingsRequest settingsRequest = new GetSettingsRequest();
settingsRequest.indices(index);
@@ -448,7 +449,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
- logger.error("timeout waiting for recovery");
+ logger.warn("timeout waiting for recovery");
return false;
}
}
@@ -457,15 +458,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
- ensureActive();
+ ensureClient();
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
- if (logger.isErrorEnabled()) {
- logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
- }
+ logger.warn("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
return false;
}
return true;
@@ -473,7 +472,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
- ensureActive();
+ ensureClient();
try {
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
@@ -530,7 +529,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient flushIndex(String index) {
if (index != null) {
- ensureActive();
+ ensureClient();
client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet();
}
return this;
@@ -539,7 +538,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient refreshIndex(String index) {
if (index != null) {
- ensureActive();
+ ensureClient();
client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet();
}
return this;
@@ -550,7 +549,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (alias == null) {
return null;
}
- ensureActive();
+ ensureClient();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
@@ -574,9 +573,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String resolveAlias(String alias) {
- ensureActive();
+ ensureClient();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
+ clusterStateRequest.blocks(false);
clusterStateRequest.metaData(true);
+ clusterStateRequest.nodes(false);
+ clusterStateRequest.routingTable(false);
+ clusterStateRequest.customs(false);
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
SortedMap map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup();
@@ -612,7 +615,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public IndexShiftResult shiftIndex(String index, String fullIndexName,
List additionalAliases, IndexAliasAdder adder) {
- ensureActive();
+ ensureClient();
if (index == null) {
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
}
@@ -706,11 +709,11 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (index.equals(fullIndexName)) {
return EMPTY_INDEX_PRUNE_RESULT;
}
- ensureActive();
+ ensureClient();
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
- logger.info("{} indices", getIndexResponse.getIndices().length);
+ logger.info("pruneIndex: total of {} indices", getIndexResponse.getIndices().length);
List candidateIndices = new ArrayList<>();
for (String s : getIndexResponse.getIndices()) {
Matcher m = pattern.matcher(s);
@@ -746,20 +749,29 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest()
.indices(indicesToDelete.toArray(s));
DeleteIndexResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
- return new SuccessPruneResult(candidateIndices, indicesToDelete, response);
+ if (response.isAcknowledged()) {
+ logger.log(Level.INFO, "deletion of {} acknowledged, waiting for GREEN", Arrays.asList(s));
+ waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
+ return new SuccessPruneResult(candidateIndices, indicesToDelete, response);
+ } else {
+ logger.log(Level.WARN, "deletion of {} not acknowledged", Arrays.asList(s));
+ return new FailPruneResult(candidateIndices, indicesToDelete, response);
+ }
}
@Override
public Long mostRecentDocument(String index, String timestampfieldname) {
- ensureActive();
- SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.field(timestampfieldname);
- sourceBuilder.size(1);
- sourceBuilder.sort(sort);
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.indices(index);
- searchRequest.source(sourceBuilder);
+ ensureClient();
+ SortBuilder sort = SortBuilders
+ .fieldSort(timestampfieldname)
+ .order(SortOrder.DESC);
+ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
+ .sort(sort)
+ .field(timestampfieldname)
+ .size(1);
+ SearchRequest searchRequest = new SearchRequest()
+ .indices(index)
+ .source(sourceBuilder);
SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
if (searchResponse.getHits().getHits().length == 1) {
SearchHit hit = searchResponse.getHits().getHits()[0];
@@ -837,7 +849,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
- ensureActive();
+ ensureClient();
if (index == null) {
throw new IOException("no index name given");
}
@@ -854,7 +866,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
}
- private void ensureActive() {
+ private void ensureClient() {
if (this instanceof MockExtendedClient) {
return;
}
@@ -886,7 +898,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
public void checkMapping(String index) {
- ensureActive();
+ ensureClient();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap> map = getMappingsResponse.getMappings();
@@ -902,25 +914,24 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private void checkMapping(String index, String type, MappingMetaData mappingMetaData) {
try {
- SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
- SearchResponse searchResponse = searchRequestBuilder.setSize(0)
- .setIndices(index)
- .setTypes(type)
- .setQuery(QueryBuilders.matchAllQuery())
- .execute()
- .actionGet();
+ SearchSourceBuilder builder = new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(0);
+ SearchRequest searchRequest = new SearchRequest()
+ .indices(index)
+ .source(builder);
+ SearchResponse searchResponse =
+ client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
long total = searchResponse.getHits().getTotalHits();
if (total > 0L) {
Map fields = new TreeMap<>();
Map root = mappingMetaData.getSourceAsMap();
- checkMapping(index, type, "", "", root, fields);
+ checkMapping(index, "", "", root, fields);
AtomicInteger empty = new AtomicInteger();
Map map = sortByValue(fields);
map.forEach((key, value) -> {
logger.info("{} {} {}",
- key,
- value,
- (double) value * 100 / total);
+ key, value, (double) value * 100 / total);
if (value == 0) {
empty.incrementAndGet();
}
@@ -934,7 +945,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@SuppressWarnings("unchecked")
- private void checkMapping(String index, String type,
+ private void checkMapping(String index,
String pathDef, String fieldName, Map map,
Map fields) {
String path = pathDef;
@@ -959,18 +970,19 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
String fieldType = o instanceof String ? o.toString() : null;
// do not recurse into our custom field mapper
if (!"standardnumber".equals(fieldType) && !"ref".equals(fieldType)) {
- checkMapping(index, type, path, key, child, fields);
+ checkMapping(index, path, key, child, fields);
}
} else if ("type".equals(key)) {
QueryBuilder filterBuilder = QueryBuilders.existsQuery(path);
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(filterBuilder);
- SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
- SearchResponse searchResponse = searchRequestBuilder.setSize(0)
- .setIndices(index)
- .setTypes(type)
- .setQuery(queryBuilder)
- .execute()
- .actionGet();
+ SearchSourceBuilder builder = new SearchSourceBuilder()
+ .query(queryBuilder)
+ .size(0);
+ SearchRequest searchRequest = new SearchRequest()
+ .indices(index)
+ .source(builder);
+ SearchResponse searchResponse =
+ client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
fields.put(path, searchResponse.getHits().getTotalHits());
}
}
@@ -978,7 +990,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private static > Map sortByValue(Map map) {
Map result = new LinkedHashMap<>();
- map.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue))
+ map.entrySet().stream().sorted(Map.Entry.comparingByValue())
.forEachOrdered(e -> result.put(e.getKey(), e.getValue()));
return result;
}
@@ -1062,6 +1074,42 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
}
+ private static class FailPruneResult implements IndexPruneResult {
+
+ List candidateIndices;
+
+ List indicesToDelete;
+
+ DeleteIndexResponse response;
+
+ FailPruneResult(List candidateIndices, List indicesToDelete,
+ DeleteIndexResponse response) {
+ this.candidateIndices = candidateIndices;
+ this.indicesToDelete = indicesToDelete;
+ this.response = response;
+ }
+
+ @Override
+ public IndexPruneResult.State getState() {
+ return IndexPruneResult.State.FAIL;
+ }
+
+ @Override
+ public List getCandidateIndices() {
+ return candidateIndices;
+ }
+
+ @Override
+ public List getDeletedIndices() {
+ return indicesToDelete;
+ }
+
+ @Override
+ public boolean isAcknowledged() {
+ return response.isAcknowledged();
+ }
+ }
+
private static class NothingToDoPruneResult implements IndexPruneResult {
List candidateIndices;
diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java
index ba9150f..e77a55a 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java
@@ -1,5 +1,8 @@
package org.xbib.elx.common;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
@@ -16,6 +19,8 @@ import java.util.ServiceLoader;
@SuppressWarnings("rawtypes")
public class ClientBuilder {
+ private static final Logger logger = LogManager.getLogger(ClientBuilder.class);
+
private final ElasticsearchClient client;
private final Settings.Builder settingsBuilder;
@@ -97,6 +102,10 @@ public class ClientBuilder {
if (provider == null) {
throw new IllegalArgumentException("no provider");
}
- return (C) providerMap.get(provider).getExtendedClient().setClient(client).init(settingsBuilder.build());
+ Settings settings = settingsBuilder.build();
+ logger.log(Level.INFO, "settings = " + settings.toDelimitedString(','));
+ return (C) providerMap.get(provider).getExtendedClient()
+ .setClient(client)
+ .init(settings);
}
}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java
index dbd7fa6..41c3e8b 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java
@@ -2,9 +2,6 @@ package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@@ -12,6 +9,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkController;
+import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.ExtendedClient;
@@ -33,27 +31,23 @@ public class DefaultBulkController implements BulkController {
private final BulkMetric bulkMetric;
+ private BulkProcessor bulkProcessor;
+
private final List indexNames;
private final Map startBulkRefreshIntervals;
private final Map stopBulkRefreshIntervals;
- private long maxWaitTime;
+ private final long maxWaitTime;
- private TimeUnit maxWaitTimeUnit;
+ private final TimeUnit maxWaitTimeUnit;
- private BulkProcessor bulkProcessor;
+ private final AtomicBoolean active;
- private BulkListener bulkListener;
-
- private AtomicBoolean active;
-
- private boolean enableBulkLogging;
-
- public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) {
+ public DefaultBulkController(ExtendedClient client) {
this.client = client;
- this.bulkMetric = bulkMetric;
+ this.bulkMetric = new DefaultBulkMetric();
this.indexNames = new ArrayList<>();
this.active = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
@@ -62,9 +56,14 @@ public class DefaultBulkController implements BulkController {
this.maxWaitTimeUnit = TimeUnit.SECONDS;
}
+ @Override
+ public BulkMetric getBulkMetric() {
+ return bulkMetric;
+ }
+
@Override
public Throwable getLastBulkError() {
- return bulkListener.getLastBulkError();
+ return bulkProcessor.getBulkListener().getLastBulkError();
}
@Override
@@ -78,22 +77,27 @@ public class DefaultBulkController implements BulkController {
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
"maxVolumePerRequest"));
- this.enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
+ boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
Parameters.ENABLE_BULK_LOGGING.getValue());
- this.bulkListener = new BulkListener();
+ BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests)
.setFlushInterval(flushIngestInterval)
.setBulkSize(maxVolumePerRequest)
.build();
- this.active.set(true);
if (logger.isInfoEnabled()) {
- logger.info("bulk processor set up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
+ logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {}, bulk logging = {}",
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest,
enableBulkLogging);
}
+ this.active.set(true);
+ }
+
+ @Override
+ public void inactivate() {
+ this.active.set(false);
}
@Override
@@ -118,65 +122,53 @@ public class DefaultBulkController implements BulkController {
}
@Override
- public void index(IndexRequest indexRequest) {
+ public void bulkIndex(IndexRequest indexRequest) {
ensureActiveAndBulk();
- if (!active.get()) {
- throw new IllegalStateException("inactive");
- }
try {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
- }
+ bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
bulkProcessor.add(indexRequest);
} catch (Exception e) {
- bulkListener.lastBulkError = e;
- active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of index failed: " + e.getMessage(), e);
}
+ inactivate();
}
}
@Override
- public void delete(DeleteRequest deleteRequest) {
+ public void bulkDelete(DeleteRequest deleteRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
- }
+ bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
- bulkListener.lastBulkError = e;
- active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of delete failed: " + e.getMessage(), e);
}
+ inactivate();
}
}
@Override
- public void update(UpdateRequest updateRequest) {
+ public void bulkUpdate(UpdateRequest updateRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
- }
+ bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
bulkProcessor.add(updateRequest);
} catch (Exception e) {
- bulkListener.lastBulkError = e;
- active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of update failed: " + e.getMessage(), e);
}
+ inactivate();
}
}
@Override
- public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
+ public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) {
try {
return bulkProcessor.awaitFlush(timeout, timeUnit);
} catch (InterruptedException e) {
@@ -195,7 +187,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
flush();
- if (waitForResponses(timeout, timeUnit)) {
+ if (waitForBulkResponses(timeout, timeUnit)) {
if (indexNames.contains(index)) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L) {
@@ -217,6 +209,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void close() throws IOException {
flush();
+ bulkMetric.close();
if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
@@ -238,92 +231,5 @@ public class DefaultBulkController implements BulkController {
if (bulkProcessor == null) {
throw new UnsupportedOperationException("bulk processor not present");
}
- if (bulkListener == null) {
- throw new UnsupportedOperationException("bulk listener not present");
- }
- }
-
- private class BulkListener implements DefaultBulkProcessor.Listener {
-
- private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
-
- private Throwable lastBulkError = null;
-
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- long l = 0;
- if (bulkMetric != null) {
- l = bulkMetric.getCurrentIngest().getCount();
- bulkMetric.getCurrentIngest().inc();
- int n = request.numberOfActions();
- bulkMetric.getSubmitted().inc(n);
- bulkMetric.getCurrentIngestNumDocs().inc(n);
- bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
- }
- if (enableBulkLogging && logger.isDebugEnabled()) {
- logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
- executionId,
- request.numberOfActions(),
- request.estimatedSizeInBytes(),
- l);
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- long l = 0;
- if (bulkMetric != null) {
- l = bulkMetric.getCurrentIngest().getCount();
- bulkMetric.getCurrentIngest().dec();
- bulkMetric.getSucceeded().inc(response.getItems().length);
- }
- int n = 0;
- for (BulkItemResponse itemResponse : response.getItems()) {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
- }
- if (itemResponse.isFailed()) {
- n++;
- if (bulkMetric != null) {
- bulkMetric.getSucceeded().dec(1);
- bulkMetric.getFailed().inc(1);
- }
- }
- }
- if (enableBulkLogging && bulkMetric != null && logger.isDebugEnabled()) {
- logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
- executionId,
- bulkMetric.getSucceeded().getCount(),
- bulkMetric.getFailed().getCount(),
- response.getTook().millis(),
- l);
- }
- if (n > 0) {
- if (enableBulkLogging && logger.isErrorEnabled()) {
- logger.error("bulk [{}] failed with {} failed items, failure message = {}",
- executionId, n, response.buildFailureMessage());
- }
- } else {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
- }
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- if (bulkMetric != null) {
- bulkMetric.getCurrentIngest().dec();
- }
- lastBulkError = failure;
- active.set(false);
- if (enableBulkLogging && logger.isErrorEnabled()) {
- logger.error("after bulk [" + executionId + "] error", failure);
- }
- }
-
- Throwable getLastBulkError() {
- return lastBulkError;
- }
}
}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java
new file mode 100644
index 0000000..bb40ba5
--- /dev/null
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java
@@ -0,0 +1,109 @@
+package org.xbib.elx.common;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.xbib.elx.api.BulkController;
+import org.xbib.elx.api.BulkListener;
+import org.xbib.elx.api.BulkMetric;
+
+class DefaultBulkListener implements BulkListener {
+
+ private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
+
+ private final BulkController bulkController;
+
+ private final BulkMetric bulkMetric;
+
+ private final boolean isBulkLoggingEnabled;
+
+ private Throwable lastBulkError = null;
+
+ public DefaultBulkListener(BulkController bulkController,
+ BulkMetric bulkMetric,
+ boolean isBulkLoggingEnabled) {
+ this.bulkController = bulkController;
+ this.bulkMetric = bulkMetric;
+ this.isBulkLoggingEnabled = isBulkLoggingEnabled;
+ }
+
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ long l = 0;
+ if (bulkMetric != null) {
+ l = bulkMetric.getCurrentIngest().getCount();
+ bulkMetric.getCurrentIngest().inc();
+ int n = request.numberOfActions();
+ bulkMetric.getSubmitted().inc(n);
+ bulkMetric.getCurrentIngestNumDocs().inc(n);
+ bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
+ }
+ if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
+ logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
+ executionId,
+ request.numberOfActions(),
+ request.estimatedSizeInBytes(),
+ l);
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ long l = 0;
+ if (bulkMetric != null) {
+ l = bulkMetric.getCurrentIngest().getCount();
+ bulkMetric.getCurrentIngest().dec();
+ bulkMetric.getSucceeded().inc(response.getItems().length);
+ }
+ int n = 0;
+ for (BulkItemResponse itemResponse : response.getItems()) {
+ if (bulkMetric != null) {
+ bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
+ }
+ if (itemResponse.isFailed()) {
+ n++;
+ if (bulkMetric != null) {
+ bulkMetric.getSucceeded().dec(1);
+ bulkMetric.getFailed().inc(1);
+ }
+ }
+ }
+ if (isBulkLoggingEnabled && bulkMetric != null && logger.isDebugEnabled()) {
+ logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
+ executionId,
+ bulkMetric.getSucceeded().getCount(),
+ bulkMetric.getFailed().getCount(),
+ response.getTook().millis(),
+ l);
+ }
+ if (n > 0) {
+ if (isBulkLoggingEnabled && logger.isErrorEnabled()) {
+ logger.error("bulk [{}] failed with {} failed items, failure message = {}",
+ executionId, n, response.buildFailureMessage());
+ }
+ } else {
+ if (bulkMetric != null) {
+ bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
+ }
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ if (bulkMetric != null) {
+ bulkMetric.getCurrentIngest().dec();
+ }
+ lastBulkError = failure;
+ if (logger.isErrorEnabled()) {
+ logger.error("after bulk [" + executionId + "] error", failure);
+ }
+ bulkController.inactivate();
+ }
+
+ @Override
+ public Throwable getLastBulkError() {
+ return lastBulkError;
+ }
+}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java
index 8127e29..1350e65 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java
@@ -1,6 +1,5 @@
package org.xbib.elx.common;
-import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.BulkMetric;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@@ -37,10 +36,6 @@ public class DefaultBulkMetric implements BulkMetric {
submitted = new CountMetric();
succeeded = new CountMetric();
failed = new CountMetric();
- }
-
- @Override
- public void init(Settings settings) {
start();
}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java
index 74915c9..e815b83 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java
@@ -11,7 +11,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkProcessor;
+import org.xbib.elx.api.BulkRequestHandler;
import java.util.Objects;
import java.util.concurrent.Executors;
@@ -29,6 +31,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class DefaultBulkProcessor implements BulkProcessor {
+ private final BulkListener bulkListener;
+
private final int bulkActions;
private final long bulkSize;
@@ -45,16 +49,22 @@ public class DefaultBulkProcessor implements BulkProcessor {
private volatile boolean closed;
- private DefaultBulkProcessor(ElasticsearchClient client, Listener listener, String name, int concurrentRequests,
- int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
+ private DefaultBulkProcessor(ElasticsearchClient client,
+ BulkListener bulkListener,
+ String name,
+ int concurrentRequests,
+ int bulkActions,
+ ByteSizeValue bulkSize,
+ TimeValue flushInterval) {
+ this.bulkListener = bulkListener;
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = concurrentRequests == 0 ?
- new SyncBulkRequestHandler(client, listener) :
- new AsyncBulkRequestHandler(client, listener, concurrentRequests);
+ new SyncBulkRequestHandler(client, bulkListener) :
+ new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests);
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
EsExecutors.daemonThreadFactory(name != null ? "[" + name + "]" : "" + "bulk_processor"));
@@ -68,12 +78,18 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
}
- public static Builder builder(ElasticsearchClient client, Listener listener) {
+ public static Builder builder(ElasticsearchClient client,
+ BulkListener listener) {
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null");
return new Builder(client, listener);
}
+ @Override
+ public BulkListener getBulkListener() {
+ return bulkListener;
+ }
+
/**
* Wait for bulk request handler with flush.
* @param timeout the timeout value
@@ -136,20 +152,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
@SuppressWarnings("rawtypes")
@Override
public DefaultBulkProcessor add(ActionRequest request) {
- return add(request, null);
- }
-
- /**
- * Adds either a delete or an index request with a payload.
- *
- * @param request request
- * @param payload payload
- * @return his bulk processor
- */
- @SuppressWarnings("rawtypes")
- @Override
- public DefaultBulkProcessor add(ActionRequest request, Object payload) {
- internalAdd(request, payload);
+ internalAdd(request);
return this;
}
@@ -183,9 +186,9 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
}
- private synchronized void internalAdd(ActionRequest> request, Object payload) {
+ private synchronized void internalAdd(ActionRequest> request) {
ensureOpen();
- bulkRequest.add(request, payload);
+ bulkRequest.add(request);
executeIfNeeded();
}
@@ -217,7 +220,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
- private final Listener listener;
+ private final BulkListener bulkListener;
private String name;
@@ -233,12 +236,12 @@ public class DefaultBulkProcessor implements BulkProcessor {
* Creates a builder of bulk processor with the client to use and the listener that will be used
* to be notified on the completion of bulk requests.
*
- * @param client the client
- * @param listener the listener
+ * @param client the client
+ * @param bulkListener the listener
*/
- Builder(ElasticsearchClient client, Listener listener) {
+ Builder(ElasticsearchClient client, BulkListener bulkListener) {
this.client = client;
- this.listener = listener;
+ this.bulkListener = bulkListener;
}
/**
@@ -308,7 +311,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
* @return a bulk processor
*/
public DefaultBulkProcessor build() {
- return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
+ return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
}
}
@@ -332,10 +335,9 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
- private final DefaultBulkProcessor.Listener listener;
+ private final BulkListener listener;
- SyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener) {
- Objects.requireNonNull(listener, "A listener is required for SyncBulkRequestHandler but null");
+ SyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener) {
this.client = client;
this.listener = listener;
}
@@ -365,14 +367,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
- private final DefaultBulkProcessor.Listener listener;
+ private final BulkListener listener;
private final Semaphore semaphore;
private final int concurrentRequests;
- private AsyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener, int concurrentRequests) {
- Objects.requireNonNull(listener, "A listener is required for AsyncBulkRequestHandler but null");
+ private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener, int concurrentRequests) {
this.client = client;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
@@ -413,7 +414,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
} catch (Exception e) {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
- if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
+ if (!bulkRequestSetupSuccessful && acquired) {
+ // if we fail on client.bulk() release the semaphore
semaphore.release();
}
}
diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java
index 4f91c14..647894b 100644
--- a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java
+++ b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java
@@ -29,7 +29,7 @@ public class MockExtendedClient extends AbstractExtendedClient {
}
@Override
- protected void closeClient() {
+ protected void closeClient() {
}
@Override
diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java
index 928dbe6..5c559bb 100644
--- a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java
+++ b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java
@@ -52,7 +52,8 @@ class AliasTest {
// get alias
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY);
long t0 = System.nanoTime();
- GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
+ GetAliasesResponse getAliasesResponse =
+ client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
long t1 = (System.nanoTime() - t0) / 1000000;
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
assertTrue(t1 >= 0);
diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java
index 2b82483..2bc1f44 100644
--- a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java
+++ b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java
@@ -33,7 +33,7 @@ class SearchTest {
ElasticsearchClient client = helper.client("1");
BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE);
for (int i = 0; i < 1; i++) {
- IndexRequest indexRequest = new IndexRequest("pages", "row")
+ IndexRequest indexRequest = new IndexRequest().index("pages").type("row")
.source(XContentFactory.jsonBuilder()
.startObject()
.field("user1", "joerg")
diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java
index 1ee6bfb..cb56250 100644
--- a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java
+++ b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java
@@ -30,7 +30,7 @@ class SimpleTest {
}
@Test
- void test() throws Exception {
+ void testSimple() throws Exception {
try {
DeleteIndexRequest deleteIndexRequest =
new DeleteIndexRequest().indices("test");
@@ -39,7 +39,6 @@ class SimpleTest {
// ignore if index not found
}
Settings indexSettings = Settings.settingsBuilder()
- .put(helper.getNodeSettings())
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java
index 6186f8c..139f560 100644
--- a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java
+++ b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java
@@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-
-import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import java.util.concurrent.atomic.AtomicInteger;
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
@@ -47,20 +46,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
- private Map nodes = new HashMap<>();
-
- private Map clients = new HashMap<>();
-
- private String home;
-
- private String cluster;
-
- private String host;
-
- private int port;
-
private static final String key = "es-instance";
+ private static final AtomicInteger count = new AtomicInteger(0);
+
private static final ExtensionContext.Namespace ns =
ExtensionContext.Namespace.create(TestExtension.class);
@@ -73,20 +62,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
- setHome(System.getProperty("path.home") + "/" + getRandomString(8));
- setClusterName("test-cluster-" + System.getProperty("user.name"));
- return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create());
+ // initialize new helper here, increase counter
+ return extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
}
@Override
- public void beforeEach(ExtensionContext context) throws Exception {
- Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
- logger.info("starting cluster");
+ public void beforeEach(ExtensionContext extensionContext) throws Exception {
+ Helper helper = extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
+ logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress();
+ String host = null;
+ int port = 0;
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
@@ -111,42 +103,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
@Override
- public void afterEach(ExtensionContext context) throws Exception {
- closeNodes();
- deleteFiles(Paths.get(getHome() + "/data"));
+ public void afterEach(ExtensionContext extensionContext) throws Exception {
+ Helper helper = extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
+ closeNodes(helper);
+ deleteFiles(Paths.get(helper.getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
- private void setClusterName(String cluster) {
- this.cluster = cluster;
- }
-
- private String getClusterName() {
- return cluster;
- }
-
- private void setHome(String home) {
- this.home = home;
- }
-
- private String getHome() {
- return home;
- }
-
- private void closeNodes() {
+ private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients");
- for (AbstractClient client : clients.values()) {
+ for (AbstractClient client : helper.clients.values()) {
client.close();
}
- clients.clear();
logger.info("closing all nodes");
- for (Node node : nodes.values()) {
+ for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
}
- nodes.clear();
logger.info("all nodes closed");
}
@@ -168,23 +144,42 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
}
- private String getRandomString(int len) {
- final char[] buf = new char[len];
- final int n = numbersAndLetters.length - 1;
- for (int i = 0; i < buf.length; i++) {
- buf[i] = numbersAndLetters[random.nextInt(n)];
- }
- return new String(buf);
- }
-
private Helper create() {
- return new Helper();
+ Helper helper = new Helper();
+ helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
+ helper.setClusterName("test-cluster-" + helper.randomString(8));
+ logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
+ return helper;
}
- class Helper {
+ static class Helper {
+
+ String home;
+
+ String cluster;
+
+ Map nodes = new HashMap<>();
+
+ Map clients = new HashMap<>();
+
+ void setHome(String home) {
+ this.home = home;
+ }
+
+ String getHome() {
+ return home;
+ }
+
+ void setClusterName(String cluster) {
+ this.cluster = cluster;
+ }
+
+ String getClusterName() {
+ return cluster;
+ }
Settings getNodeSettings() {
- return settingsBuilder()
+ return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.build();
@@ -198,20 +193,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return clients.get(id);
}
- String randomString(int n) {
- return getRandomString(n);
+ String randomString(int len) {
+ final char[] buf = new char[len];
+ final int n = numbersAndLetters.length - 1;
+ for (int i = 0; i < buf.length; i++) {
+ buf[i] = numbersAndLetters[random.nextInt(n)];
+ }
+ return new String(buf);
}
private Node buildNode(String id) {
- Settings nodeSettings = settingsBuilder()
+ Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
- .put("name", id)
+ .put("node.name", id)
.build();
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
- logger.info("clients={}", clients);
return node;
}
}
diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java
index 4f298aa..39beedb 100644
--- a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java
+++ b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java
@@ -45,18 +45,19 @@ class WildcardTest {
}
private void index(ElasticsearchClient client, String id, String fieldValue) throws IOException {
- client.execute(IndexAction.INSTANCE, new IndexRequest("index", "type", id)
- .source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())).actionGet();
+ client.execute(IndexAction.INSTANCE, new IndexRequest().index("index").type("type").id(id)
+ .source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject()))
+ .actionGet();
client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet();
}
private long count(ElasticsearchClient client, QueryBuilder queryBuilder) {
- SearchSourceBuilder builder = new SearchSourceBuilder();
- builder.query(queryBuilder);
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.indices("index");
- searchRequest.types("type");
- searchRequest.source(builder);
+ SearchSourceBuilder builder = new SearchSourceBuilder()
+ .query(queryBuilder);
+ SearchRequest searchRequest = new SearchRequest()
+ .indices("index")
+ .types("type")
+ .source(builder);
return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
}
diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java b/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java
index 5a27aff..ab5f171 100644
--- a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java
+++ b/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java
@@ -1 +1 @@
-package org.xbib.elx.common.test;
\ No newline at end of file
+package org.xbib.elx.common.test;
diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java
index f922bcc..3733275 100644
--- a/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java
+++ b/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java
@@ -94,7 +94,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
- assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@@ -122,7 +122,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
- assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@@ -180,7 +180,7 @@ class ClientTest {
logger.warn("skipping, no node available");
} finally {
client.stopBulk("test5", 60L, TimeUnit.SECONDS);
- assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(maxthreads * actions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java
index 0ea421c..f2a1689 100644
--- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java
+++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java
@@ -63,7 +63,7 @@ class DuplicateIDTest {
logger.warn("skipping, no node available");
} finally {
client.close();
- assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java
index c9a7cd7..c0bab0d 100644
--- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java
+++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java
@@ -37,7 +37,7 @@ class SmokeTest {
client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
- assertEquals(helper.getCluster(), client.getClusterName());
+ assertEquals(helper.getClusterName(), client.getClusterName());
client.checkMapping("test_smoke");
client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
client.flush();
@@ -54,8 +54,8 @@ class SmokeTest {
int replica = client.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
- assertEquals(0, client.getBulkMetric().getFailed().getCount());
- assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(0, client.getBulkController().getBulkMetric().getFailed().getCount());
+ assertEquals(4, client.getBulkController().getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java
index dcd0e31..e394c9f 100644
--- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java
+++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java
@@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-
-import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import java.util.concurrent.atomic.AtomicInteger;
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
@@ -47,20 +46,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
- private Map nodes = new HashMap<>();
-
- private Map clients = new HashMap<>();
-
- private String home;
-
- private String cluster;
-
- private String host;
-
- private int port;
-
private static final String key = "es-instance";
+ private static final AtomicInteger count = new AtomicInteger(0);
+
private static final ExtensionContext.Namespace ns =
ExtensionContext.Namespace.create(TestExtension.class);
@@ -73,20 +62,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
- setHome(System.getProperty("path.home") + "/" + getRandomString(8));
- setClusterName("test-cluster-" + System.getProperty("user.name"));
- return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create());
+ // initialize new helper here, increase counter
+ return extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
}
@Override
- public void beforeEach(ExtensionContext context) throws Exception {
- Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
- logger.info("starting cluster");
+ public void beforeEach(ExtensionContext extensionContext) throws Exception {
+ Helper helper = extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
+ logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress();
+ String host = null;
+ int port = 0;
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
@@ -111,42 +103,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
@Override
- public void afterEach(ExtensionContext context) throws Exception {
- closeNodes();
- deleteFiles(Paths.get(getHome() + "/data"));
+ public void afterEach(ExtensionContext extensionContext) throws Exception {
+ Helper helper = extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
+ closeNodes(helper);
+ deleteFiles(Paths.get(helper.getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
- private void setClusterName(String cluster) {
- this.cluster = cluster;
- }
-
- private String getClusterName() {
- return cluster;
- }
-
- private void setHome(String home) {
- this.home = home;
- }
-
- private String getHome() {
- return home;
- }
-
- private void closeNodes() {
+ private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients");
- for (AbstractClient client : clients.values()) {
+ for (AbstractClient client : helper.clients.values()) {
client.close();
}
- clients.clear();
logger.info("closing all nodes");
- for (Node node : nodes.values()) {
+ for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
}
- nodes.clear();
logger.info("all nodes closed");
}
@@ -168,20 +144,46 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
}
- private String getRandomString(int len) {
- final char[] buf = new char[len];
- final int n = numbersAndLetters.length - 1;
- for (int i = 0; i < buf.length; i++) {
- buf[i] = numbersAndLetters[random.nextInt(n)];
- }
- return new String(buf);
- }
-
private Helper create() {
- return new Helper();
+ Helper helper = new Helper();
+ helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
+ helper.setClusterName("test-cluster-" + helper.randomString(8));
+ logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
+ return helper;
}
- class Helper {
+ static class Helper {
+
+ String home;
+
+ String cluster;
+
+ Map nodes = new HashMap<>();
+
+ Map clients = new HashMap<>();
+
+ void setHome(String home) {
+ this.home = home;
+ }
+
+ String getHome() {
+ return home;
+ }
+
+ void setClusterName(String cluster) {
+ this.cluster = cluster;
+ }
+
+ String getClusterName() {
+ return cluster;
+ }
+
+ Settings getNodeSettings() {
+ return Settings.builder()
+ .put("cluster.name", getClusterName())
+ .put("path.home", getHome())
+ .build();
+ }
void startNode(String id) {
buildNode(id).start();
@@ -191,25 +193,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return clients.get(id);
}
- String getCluster() {
- return getClusterName();
- }
-
- String randomString(int n) {
- return getRandomString(n);
+ String randomString(int len) {
+ final char[] buf = new char[len];
+ final int n = numbersAndLetters.length - 1;
+ for (int i = 0; i < buf.length; i++) {
+ buf[i] = numbersAndLetters[random.nextInt(n)];
+ }
+ return new String(buf);
}
private Node buildNode(String id) {
- Settings nodeSettings = settingsBuilder()
- .put("cluster.name", getClusterName())
- .put("path.home", getHome())
- .put("name", id)
+ Settings nodeSettings = Settings.builder()
+ .put(getNodeSettings())
+ .put("node.name", id)
.build();
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
- logger.info("clients={}", clients);
return node;
}
}
diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java
index 73c0f81..409f38f 100644
--- a/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java
+++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java
@@ -97,7 +97,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
- assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@@ -126,7 +126,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
- assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@@ -183,7 +183,7 @@ class ClientTest {
logger.warn("skipping, no node available");
} finally {
client.stopBulk("test5", 60L, TimeUnit.SECONDS);
- assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(maxthreads * maxloop, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java
index 64207e7..37149f6 100644
--- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java
+++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java
@@ -64,7 +64,7 @@ class DuplicateIDTest {
logger.warn("skipping, no node available");
} finally {
client.close();
- assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java
index be4a87b..74d8dee 100644
--- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java
+++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java
@@ -38,7 +38,7 @@ class SmokeTest extends TestExtension {
client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
- assertEquals(helper.getCluster(), client.getClusterName());
+ assertEquals(helper.getClusterName(), client.getClusterName());
client.checkMapping("test_smoke");
client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
client.flush();
@@ -56,8 +56,8 @@ class SmokeTest extends TestExtension {
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
- assertEquals(0, client.getBulkMetric().getFailed().getCount());
- assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
+ assertEquals(0, client.getBulkController().getBulkMetric().getFailed().getCount());
+ assertEquals(4, client.getBulkController().getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java
index e37ca8c..7867e42 100644
--- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java
+++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java
@@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-
-import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import java.util.concurrent.atomic.AtomicInteger;
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
@@ -47,19 +46,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
- private Map nodes = new HashMap<>();
+ private static final String key = "es-instance";
- private Map clients = new HashMap<>();
-
- private String home;
-
- private String cluster;
-
- private String host;
-
- private int port;
-
- private static final String key = "es-test-instance";
+ private static final AtomicInteger count = new AtomicInteger(0);
private static final ExtensionContext.Namespace ns =
ExtensionContext.Namespace.create(TestExtension.class);
@@ -73,15 +62,16 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
- setHome(System.getProperty("path.home") + "/" + getRandomString(8));
- setClusterName("test-cluster-" + System.getProperty("user.name"));
- return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
+ // initialize new helper here, increase counter
+ return extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
}
@Override
public void beforeEach(ExtensionContext extensionContext) throws Exception {
- Helper helper = extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
- logger.info("starting cluster");
+ Helper helper = extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
+ logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
@@ -89,8 +79,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.publishAddress();
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
- host = address.address().getHostName();
- port = address.address().getPort();
+ helper.host = address.address().getHostName();
+ helper.port = address.address().getPort();
}
try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
@@ -107,46 +97,30 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
ClusterStateResponse clusterStateResponse =
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
- logger.info("host = {} port = {}", host, port);
+ logger.info("host = {} port = {}", helper.host, helper.port);
}
@Override
- public void afterEach(ExtensionContext context) throws Exception {
- closeNodes();
- deleteFiles(Paths.get(getHome() + "/data"));
- logger.info("data files wiped: " + getHome());
+ public void afterEach(ExtensionContext extensionContext) throws Exception {
+ Helper helper = extensionContext.getParent().get().getStore(ns)
+ .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
+ closeNodes(helper);
+ deleteFiles(Paths.get(helper.getHome() + "/data"));
+ logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
- private void setClusterName(String cluster) {
- this.cluster = cluster;
- }
-
- private String getClusterName() {
- return cluster;
- }
-
- private void setHome(String home) {
- this.home = home;
- }
-
- private String getHome() {
- return home;
- }
-
- private void closeNodes() {
+ private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients");
- for (AbstractClient client : clients.values()) {
+ for (AbstractClient client : helper.clients.values()) {
client.close();
}
- clients.clear();
logger.info("closing all nodes");
- for (Node node : nodes.values()) {
+ for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
}
- nodes.clear();
logger.info("all nodes closed");
}
@@ -168,34 +142,57 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
}
- private String getRandomString(int len) {
- final char[] buf = new char[len];
- final int n = numbersAndLetters.length - 1;
- for (int i = 0; i < buf.length; i++) {
- buf[i] = numbersAndLetters[random.nextInt(n)];
- }
- return new String(buf);
- }
-
private Helper create() {
- return new Helper();
+ Helper helper = new Helper();
+ helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
+ helper.setClusterName("test-cluster-" + helper.randomString(8));
+ logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
+ return helper;
}
- class Helper {
+ static class Helper {
+
+ String home;
+
+ String cluster;
+
+ String host;
+
+ int port;
+
+ Map nodes = new HashMap<>();
+
+ Map clients = new HashMap<>();
+
+ void setHome(String home) {
+ this.home = home;
+ }
+
+ String getHome() {
+ return home;
+ }
+
+ void setClusterName(String cluster) {
+ this.cluster = cluster;
+ }
+
+ String getClusterName() {
+ return cluster;
+ }
Settings getNodeSettings() {
- return settingsBuilder()
+ return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.build();
}
Settings getTransportSettings() {
- return settingsBuilder()
+ return Settings.builder()
+ .put("cluster.name", cluster)
+ .put("path.home", getHome())
.put("host", host)
.put("port", port)
- .put("cluster.name", getClusterName())
- .put("path.home", getHome() + "/transport")
.build();
}
@@ -207,24 +204,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return clients.get(id);
}
- String getCluster() {
- return getClusterName();
- }
-
- String randomString(int n) {
- return getRandomString(n);
+ String randomString(int len) {
+ final char[] buf = new char[len];
+ final int n = numbersAndLetters.length - 1;
+ for (int i = 0; i < buf.length; i++) {
+ buf[i] = numbersAndLetters[random.nextInt(n)];
+ }
+ return new String(buf);
}
private Node buildNode(String id) {
- Settings nodeSettings = settingsBuilder()
+ Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
- .put("name", id)
+ .put("node.name", id)
.build();
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
- logger.info("clients={}", clients);
return node;
}
}
diff --git a/gradle.properties b/gradle.properties
index 4b70d38..4ed757c 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,6 +1,6 @@
group = org.xbib
name = elx
-version = 2.2.1.15
+version = 2.2.1.16
# main
xbib-metrics.version = 2.0.0
@@ -8,7 +8,7 @@ xbib-metrics.version = 2.0.0
xbib-guice.version = 4.4.2
# guava 18 -> our guava 28.1
xbib-guava.version = 28.1
-xbib-netty-http.version = 4.1.48.0
+xbib-netty-http.version = 4.1.49.0
elasticsearch.version = 2.2.1
#jackson 2.6.7 (original ES version) -> 2.9.10
jackson.version = 2.9.10
@@ -17,8 +17,5 @@ log4j.version = 2.12.1
mustache.version = 0.9.5
jts.version = 1.13
-# test
-junit.version = 5.5.1
-
-# docs
+junit.version = 5.6.2
asciidoclet.version = 1.5.4