From 539c77cbd4e1e649f2013c8b3d5fadd21bbf7ca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Mon, 11 May 2020 23:49:55 +0200 Subject: [PATCH] update to elx API of 7.6 branch --- .travis.yml | 9 - build.gradle | 21 +- .../java/org/xbib/elx/api/BulkController.java | 13 +- .../java/org/xbib/elx/api/BulkListener.java | 43 +++ .../java/org/xbib/elx/api/BulkMetric.java | 3 - .../java/org/xbib/elx/api/BulkProcessor.java | 49 +--- .../org/xbib/elx/api/BulkRequestHandler.java | 11 + .../java/org/xbib/elx/api/ExtendedClient.java | 15 +- .../org/xbib/elx/api/IndexPruneResult.java | 2 +- .../org/xbib/elx/api/ReadClientProvider.java | 1 - .../elx/common/AbstractExtendedClient.java | 256 +++++++++++------- .../org/xbib/elx/common/ClientBuilder.java | 11 +- .../elx/common/DefaultBulkController.java | 164 +++-------- .../xbib/elx/common/DefaultBulkListener.java | 109 ++++++++ .../xbib/elx/common/DefaultBulkMetric.java | 5 - .../xbib/elx/common/DefaultBulkProcessor.java | 70 ++--- .../xbib/elx/common/MockExtendedClient.java | 2 +- .../org/xbib/elx/common/test/AliasTest.java | 3 +- .../org/xbib/elx/common/test/SearchTest.java | 2 +- .../org/xbib/elx/common/test/SimpleTest.java | 3 +- .../xbib/elx/common/test/TestExtension.java | 121 ++++----- .../xbib/elx/common/test/WildcardTest.java | 17 +- .../xbib/elx/common/test/package-info.java | 2 +- .../org/xbib/elx/node/test/ClientTest.java | 6 +- .../xbib/elx/node/test/DuplicateIDTest.java | 2 +- .../org/xbib/elx/node/test/SmokeTest.java | 6 +- .../org/xbib/elx/node/test/TestExtension.java | 133 ++++----- .../xbib/elx/transport/test/ClientTest.java | 6 +- .../elx/transport/test/DuplicateIDTest.java | 2 +- .../xbib/elx/transport/test/SmokeTest.java | 6 +- .../elx/transport/test/TestExtension.java | 139 +++++----- gradle.properties | 9 +- 32 files changed, 650 insertions(+), 591 deletions(-) create mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkListener.java create mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java create mode 100644 elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java diff --git a/.travis.yml b/.travis.yml index 0c75adf..94d2a22 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,3 @@ language: java -sudo: required jdk: - openjdk11 -cache: - directories: - - $HOME/.m2 -after_success: - - ./gradlew sonarqube -Dsonar.host.url=https://sonarqube.com -Dsonar.login=$SONAR_TOKEN -env: - global: - secure: n1Ai4q/yMLn/Pg5pA4lTavoJoe7mQYB1PSKnZAqwbgyla94ySzK6iyBCBiNs/foMPisB/x+DHvmUXTsjvquw9Ay48ZITCV3xhcWzD0eZM2TMoG19CpRAEe8L8LNuYiti9k89ijDdUGZ5ifsvQNTGNHksouayAuApC3PrTUejJfR6SYrp1ZsQTbsMlr+4XU3p7QknK5rGgOwATIMP28F+bVnB05WJtlJA3b0SeucCurn3wJ4FGBQXRYmdlT7bQhNE4QgZM1VzcUFD/K0TBxzzq/otb/lNRSifyoekktDmJwQnaT9uQ4R8R6KdQ2Kb38Rvgjur+TKm5i1G8qS2+6LnIxQJG1aw3JvKK6W0wWCgnAVVRrXaCLday9NuY59tuh1mfjQ10UcsMNKcTdcKEMrLow506wSETcXc7L/LEnneWQyJJeV4vhPqR7KJfsBbeqgz3yIfsCn1GZVWFlfegzYCN52YTl0Y0uRD2Z+TnzQu+Bf4DzaWXLge1rz31xkhyeNNspub4h024+XqBjcMm6M9mlMzmmK8t2DIwPy/BlQbFBUyhrxziuR/5/2NEDPyHltvWkRb4AUIa25WJqkV0gTBegbMadZ9DyOo6Ea7aoVFBae2WGR08F1kzABsWrd1S7UJmWxW35iyMEtoAIayXphIK98qO5aCutwZ+3iOQazxbAs= diff --git a/build.gradle b/build.gradle index 2f8e691..60e581e 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,11 @@ plugins { id "org.sonarqube" version "2.8" id "io.codearte.nexus-staging" version "0.21.1" id "com.github.spotbugs" version "2.0.1" - id "org.xbib.gradle.plugin.asciidoctor" version "1.5.6.0.1" + id "org.xbib.gradle.plugin.asciidoctor" version "1.6.0.1" +} + +if (JavaVersion.current() < JavaVersion.VERSION_11) { + throw new GradleException("This build must be run with Java/OpenJDK 11+") } subprojects { @@ -19,10 +23,10 @@ subprojects { dependencies { testImplementation "org.junit.jupiter:junit-jupiter-api:${project.property('junit.version')}" - testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.property('junit.version')}" testImplementation "org.apache.logging.log4j:log4j-core:${project.property('log4j.version')}" testImplementation "org.apache.logging.log4j:log4j-jul:${project.property('log4j.version')}" testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:${project.property('log4j.version')}" + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.property('junit.version')}" asciidoclet "org.xbib:asciidoclet:${project.property('asciidoclet.version')}" } @@ -46,14 +50,11 @@ subprojects { test { enabled = true useJUnitPlatform() - // we MUST use this hack because of Elasticsearch 2.2.1 Lucene 5.4.1 MMapDirectory unmap() hackery - doFirst { - jvmArgs = [ - '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED', - '--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED', - '--add-opens=java.base/java.nio=ALL-UNNAMED' - ] - } + jvmArgs = [ + '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED', + '--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED', + '--add-opens=java.base/java.nio=ALL-UNNAMED' + ] systemProperty 'java.util.logging.manager', 'org.apache.logging.log4j.jul.LogManager' systemProperty 'jna.debug_load', 'true' systemProperty 'path.home', "${project.buildDir}" diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java index 69906ca..ec375eb 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java @@ -14,6 +14,10 @@ public interface BulkController extends Closeable, Flushable { void init(Settings settings); + void inactivate(); + + BulkMetric getBulkMetric(); + Throwable getLastBulkError(); void startBulkMode(IndexDefinition indexDefinition) throws IOException; @@ -21,16 +25,15 @@ public interface BulkController extends Closeable, Flushable { void startBulkMode(String indexName, long startRefreshIntervalInSeconds, long stopRefreshIntervalInSeconds) throws IOException; - void index(IndexRequest indexRequest); + void bulkIndex(IndexRequest indexRequest); - void delete(DeleteRequest deleteRequest); + void bulkDelete(DeleteRequest deleteRequest); - void update(UpdateRequest updateRequest); + void bulkUpdate(UpdateRequest updateRequest); - boolean waitForResponses(long timeout, TimeUnit timeUnit); + boolean waitForBulkResponses(long timeout, TimeUnit timeUnit); void stopBulkMode(IndexDefinition indexDefinition) throws IOException; void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException; - } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java new file mode 100644 index 0000000..9caa339 --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java @@ -0,0 +1,43 @@ +package org.xbib.elx.api; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; + +public interface BulkListener { + + /** + * Callback before the bulk is executed. + * + * @param executionId execution ID + * @param request request + */ + void beforeBulk(long executionId, BulkRequest request); + + /** + * Callback after a successful execution of bulk request. + * + * @param executionId execution ID + * @param request request + * @param response response + */ + void afterBulk(long executionId, BulkRequest request, BulkResponse response); + + /** + * Callback after a failed execution of bulk request. + * + * Note that in case an instance of InterruptedException is passed, which means that request + * processing has been + * cancelled externally, the thread's interruption status has been restored prior to calling this method. + * + * @param executionId execution ID + * @param request request + * @param failure failure + */ + void afterBulk(long executionId, BulkRequest request, Throwable failure); + + /** + * Get the last bulk error. + * @return the last bulk error + */ + Throwable getLastBulkError(); +} diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index af825e5..7e84376 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -1,6 +1,5 @@ package org.xbib.elx.api; -import org.elasticsearch.common.settings.Settings; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -8,8 +7,6 @@ import java.io.Closeable; public interface BulkMetric extends Closeable { - void init(Settings settings); - Metered getTotalIngest(); Count getTotalIngestSizeInBytes(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index cb994e0..73cc462 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -1,8 +1,6 @@ package org.xbib.elx.api; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import java.io.Closeable; import java.io.Flushable; @@ -13,54 +11,9 @@ public interface BulkProcessor extends Closeable, Flushable { @SuppressWarnings("rawtypes") BulkProcessor add(ActionRequest request); - @SuppressWarnings("rawtypes") - BulkProcessor add(ActionRequest request, Object payload); - boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; - interface BulkRequestHandler { - - void execute(BulkRequest bulkRequest, long executionId); - - boolean close(long timeout, TimeUnit unit) throws InterruptedException; - - } - - /** - * A listener for the execution. - */ - interface Listener { - - /** - * Callback before the bulk is executed. - * - * @param executionId execution ID - * @param request request - */ - void beforeBulk(long executionId, BulkRequest request); - - /** - * Callback after a successful execution of bulk request. - * - * @param executionId execution ID - * @param request request - * @param response response - */ - void afterBulk(long executionId, BulkRequest request, BulkResponse response); - - /** - * Callback after a failed execution of bulk request. - * - * Note that in case an instance of InterruptedException is passed, which means that request - * processing has been - * cancelled externally, the thread's interruption status has been restored prior to calling this method. - * - * @param executionId execution ID - * @param request request - * @param failure failure - */ - void afterBulk(long executionId, BulkRequest request, Throwable failure); - } + BulkListener getBulkListener(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java new file mode 100644 index 0000000..1bc3886 --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java @@ -0,0 +1,11 @@ +package org.xbib.elx.api; + +import org.elasticsearch.action.bulk.BulkRequest; +import java.util.concurrent.TimeUnit; + +public interface BulkRequestHandler { + + void execute(BulkRequest bulkRequest, long executionId); + + boolean close(long timeout, TimeUnit unit) throws InterruptedException; +} diff --git a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java index 0ec8639..6114621 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java @@ -6,6 +6,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.Closeable; import java.io.Flushable; @@ -34,12 +35,6 @@ public interface ExtendedClient extends Flushable, Closeable { */ ElasticsearchClient getClient(); - /** - * Get bulk metric. - * @return the bulk metric - */ - BulkMetric getBulkMetric(); - /** * Get buulk control. * @return the bulk control @@ -190,6 +185,8 @@ public interface ExtendedClient extends Flushable, Closeable { */ ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException; + ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException; + /** * Create a new index. * @@ -199,7 +196,7 @@ public interface ExtendedClient extends Flushable, Closeable { * @return this * @throws IOException if settings/mapping is invalid or index creation fails */ - ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException; + ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException; /** * Create a new index. @@ -317,7 +314,7 @@ public interface ExtendedClient extends Flushable, Closeable { /** * Force segment merge of an index. - * @param indexDefinition th eindex definition + * @param indexDefinition the index definition * @return this */ boolean forceMerge(IndexDefinition indexDefinition); @@ -398,7 +395,7 @@ public interface ExtendedClient extends Flushable, Closeable { String resolveMostRecentIndex(String alias); /** - * Get all aliases. + * Get all index aliases. * @param index the index * @return map of index aliases */ diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java index 0c118f8..a4ef207 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java @@ -4,7 +4,7 @@ import java.util.List; public interface IndexPruneResult { - enum State { NOTHING_TO_DO, SUCCESS, NONE }; + enum State { NOTHING_TO_DO, SUCCESS, NONE, FAIL }; State getState(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java b/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java index 6640686..bc0eb16 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java +++ b/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java @@ -1,6 +1,5 @@ package org.xbib.elx.api; -@FunctionalInterface public interface ReadClientProvider { C getReadClient(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java index dea431c..8ccb489 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java @@ -2,6 +2,7 @@ package org.xbib.elx.common; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; @@ -58,6 +59,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -67,7 +69,6 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.xbib.elx.api.BulkController; -import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.ExtendedClient; import org.xbib.elx.api.IndexAliasAdder; import org.xbib.elx.api.IndexDefinition; @@ -85,7 +86,6 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -108,22 +108,11 @@ public abstract class AbstractExtendedClient implements ExtendedClient { private static final Logger logger = LogManager.getLogger(AbstractExtendedClient.class.getName()); - /** - * The one and only index type name used in the extended client. - * Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_". - */ - private static final String TYPE_NAME = "doc"; - - /** - * The Elasticsearch client. - */ private ElasticsearchClient client; - private BulkMetric bulkMetric; - private BulkController bulkController; - private AtomicBoolean closed; + private final AtomicBoolean closed; private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() { @Override @@ -178,11 +167,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient { return client; } - @Override - public BulkMetric getBulkMetric() { - return bulkMetric; - } - @Override public BulkController getBulkController() { return bulkController; @@ -190,15 +174,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public AbstractExtendedClient init(Settings settings) throws IOException { + logger.info("initializing with settings = " + settings.toDelimitedString(',')); if (client == null) { client = createClient(settings); } - if (bulkMetric == null) { - this.bulkMetric = new DefaultBulkMetric(); - this.bulkMetric.init(settings); - } if (bulkController == null) { - this.bulkController = new DefaultBulkController(this, bulkMetric); + this.bulkController = new DefaultBulkController(this); bulkController.init(settings); } return this; @@ -213,12 +194,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public void close() throws IOException { - ensureActive(); + ensureClient(); if (closed.compareAndSet(false, true)) { - if (bulkMetric != null) { - logger.info("closing bulk metric"); - bulkMetric.close(); - } if (bulkController != null) { logger.info("closing bulk controller"); bulkController.close(); @@ -229,9 +206,9 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public String getClusterName() { - ensureActive(); + ensureClient(); try { - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); + ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear(); ClusterStateResponse clusterStateResponse = client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); return clusterStateResponse.getClusterName().value(); @@ -249,7 +226,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException { - ensureActive(); + ensureClient(); waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); URL indexSettings = indexDefinition.getSettingsUrl(); if (indexSettings == null) { @@ -284,7 +261,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient newIndex(String index) throws IOException { - return newIndex(index, Settings.EMPTY, (Map) null); + return newIndex(index, Settings.EMPTY, (Map) null); } @Override @@ -296,7 +273,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient newIndex(String index, Settings settings) throws IOException { - return newIndex(index, settings, (Map) null); + return newIndex(index, settings, (Map) null); } @Override @@ -306,8 +283,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public ExtendedClient newIndex(String index, Settings settings, Map mapping) { - ensureActive(); + public ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) { + ensureClient(); if (index == null) { logger.warn("no index name given to create index"); return this; @@ -317,11 +294,35 @@ public abstract class AbstractExtendedClient implements ExtendedClient { createIndexRequest.settings(settings); } if (mapping != null) { - createIndexRequest.mapping(TYPE_NAME, mapping); + createIndexRequest.mapping("doc", mapping); } CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); - logger.info("index {} created: {}", index, createIndexResponse); - return this; + if (createIndexResponse.isAcknowledged()) { + return this; + } + throw new IllegalStateException("index creation not acknowledged: " + index); + } + + + @Override + public ExtendedClient newIndex(String index, Settings settings, Map mapping) { + ensureClient(); + if (index == null) { + logger.warn("no index name given to create index"); + return this; + } + CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index); + if (settings != null) { + createIndexRequest.settings(settings); + } + if (mapping != null) { + createIndexRequest.mapping("doc", mapping); + } + CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); + if (createIndexResponse.isAcknowledged()) { + return this; + } + throw new IllegalStateException("index creation not acknowledged: " + index); } @Override @@ -331,7 +332,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient deleteIndex(String index) { - ensureActive(); + ensureClient(); if (index == null) { logger.warn("no index name given to delete index"); return this; @@ -351,7 +352,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException { if (bulkController != null) { - ensureActive(); + ensureClient(); bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); } return this; @@ -360,7 +361,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException { if (bulkController != null) { - ensureActive(); + ensureClient(); bulkController.stopBulkMode(indexDefinition); } return this; @@ -369,7 +370,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException { if (bulkController != null) { - ensureActive(); + ensureClient(); bulkController.stopBulkMode(index, timeout, timeUnit); } return this; @@ -377,63 +378,63 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient index(String index, String id, boolean create, String source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create) + return index(new IndexRequest().index(index).type("doc").id(id).create(create) .source(source.getBytes(StandardCharsets.UTF_8))); } @Override public ExtendedClient index(String index, String id, boolean create, BytesReference source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create) + return index(new IndexRequest().index(index).type("doc").id(id).create(create) .source(source)); } @Override public ExtendedClient index(IndexRequest indexRequest) { - ensureActive(); - bulkController.index(indexRequest); + ensureClient(); + bulkController.bulkIndex(indexRequest); return this; } @Override public ExtendedClient delete(String index, String id) { - return delete(new DeleteRequest(index, TYPE_NAME, id)); + return delete(new DeleteRequest().index(index).type("doc").id(id)); } @Override public ExtendedClient delete(DeleteRequest deleteRequest) { - ensureActive(); - bulkController.delete(deleteRequest); + ensureClient(); + bulkController.bulkDelete(deleteRequest); return this; } @Override public ExtendedClient update(String index, String id, BytesReference source) { - return update(new UpdateRequest(index, TYPE_NAME, id) + return update(new UpdateRequest().index(index).type("doc").id(id) .doc(source)); } @Override public ExtendedClient update(String index, String id, String source) { - return update(new UpdateRequest(index, TYPE_NAME, id) + return update(new UpdateRequest().index(index).type("doc").id(id) .doc(source.getBytes(StandardCharsets.UTF_8))); } @Override public ExtendedClient update(UpdateRequest updateRequest) { - ensureActive(); - bulkController.update(updateRequest); + ensureClient(); + bulkController.bulkUpdate(updateRequest); return this; } @Override public boolean waitForResponses(long timeout, TimeUnit timeUnit) { - ensureActive(); - return bulkController.waitForResponses(timeout, timeUnit); + ensureClient(); + return bulkController.waitForBulkResponses(timeout, timeUnit); } @Override public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) { - ensureActive(); + ensureClient(); ensureIndexGiven(index); GetSettingsRequest settingsRequest = new GetSettingsRequest(); settingsRequest.indices(index); @@ -448,7 +449,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { - logger.error("timeout waiting for recovery"); + logger.warn("timeout waiting for recovery"); return false; } } @@ -457,15 +458,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { - ensureActive(); + ensureClient(); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { - if (logger.isErrorEnabled()) { - logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name()); - } + logger.warn("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name()); return false; } return true; @@ -473,7 +472,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { - ensureActive(); + ensureClient(); try { TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, @@ -530,7 +529,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient flushIndex(String index) { if (index != null) { - ensureActive(); + ensureClient(); client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet(); } return this; @@ -539,7 +538,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient refreshIndex(String index) { if (index != null) { - ensureActive(); + ensureClient(); client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet(); } return this; @@ -550,7 +549,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { if (alias == null) { return null; } - ensureActive(); + ensureClient(); GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias); GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); @@ -574,9 +573,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public String resolveAlias(String alias) { - ensureActive(); + ensureClient(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.blocks(false); clusterStateRequest.metaData(true); + clusterStateRequest.nodes(false); + clusterStateRequest.routingTable(false); + clusterStateRequest.customs(false); ClusterStateResponse clusterStateResponse = client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); SortedMap map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup(); @@ -612,7 +615,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases, IndexAliasAdder adder) { - ensureActive(); + ensureClient(); if (index == null) { return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to } @@ -706,11 +709,11 @@ public abstract class AbstractExtendedClient implements ExtendedClient { if (index.equals(fullIndexName)) { return EMPTY_INDEX_PRUNE_RESULT; } - ensureActive(); + ensureClient(); GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); - logger.info("{} indices", getIndexResponse.getIndices().length); + logger.info("pruneIndex: total of {} indices", getIndexResponse.getIndices().length); List candidateIndices = new ArrayList<>(); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); @@ -746,20 +749,29 @@ public abstract class AbstractExtendedClient implements ExtendedClient { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest() .indices(indicesToDelete.toArray(s)); DeleteIndexResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); - return new SuccessPruneResult(candidateIndices, indicesToDelete, response); + if (response.isAcknowledged()) { + logger.log(Level.INFO, "deletion of {} acknowledged, waiting for GREEN", Arrays.asList(s)); + waitForCluster("GREEN", 30L, TimeUnit.SECONDS); + return new SuccessPruneResult(candidateIndices, indicesToDelete, response); + } else { + logger.log(Level.WARN, "deletion of {} not acknowledged", Arrays.asList(s)); + return new FailPruneResult(candidateIndices, indicesToDelete, response); + } } @Override public Long mostRecentDocument(String index, String timestampfieldname) { - ensureActive(); - SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.field(timestampfieldname); - sourceBuilder.size(1); - sourceBuilder.sort(sort); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(index); - searchRequest.source(sourceBuilder); + ensureClient(); + SortBuilder sort = SortBuilders + .fieldSort(timestampfieldname) + .order(SortOrder.DESC); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() + .sort(sort) + .field(timestampfieldname) + .size(1); + SearchRequest searchRequest = new SearchRequest() + .indices(index) + .source(sourceBuilder); SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); if (searchResponse.getHits().getHits().length == 1) { SearchHit hit = searchResponse.getHits().getHits()[0]; @@ -837,7 +849,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { - ensureActive(); + ensureClient(); if (index == null) { throw new IOException("no index name given"); } @@ -854,7 +866,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); } - private void ensureActive() { + private void ensureClient() { if (this instanceof MockExtendedClient) { return; } @@ -886,7 +898,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } public void checkMapping(String index) { - ensureActive(); + ensureClient(); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); ImmutableOpenMap> map = getMappingsResponse.getMappings(); @@ -902,25 +914,24 @@ public abstract class AbstractExtendedClient implements ExtendedClient { private void checkMapping(String index, String type, MappingMetaData mappingMetaData) { try { - SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); - SearchResponse searchResponse = searchRequestBuilder.setSize(0) - .setIndices(index) - .setTypes(type) - .setQuery(QueryBuilders.matchAllQuery()) - .execute() - .actionGet(); + SearchSourceBuilder builder = new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(0); + SearchRequest searchRequest = new SearchRequest() + .indices(index) + .source(builder); + SearchResponse searchResponse = + client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); long total = searchResponse.getHits().getTotalHits(); if (total > 0L) { Map fields = new TreeMap<>(); Map root = mappingMetaData.getSourceAsMap(); - checkMapping(index, type, "", "", root, fields); + checkMapping(index, "", "", root, fields); AtomicInteger empty = new AtomicInteger(); Map map = sortByValue(fields); map.forEach((key, value) -> { logger.info("{} {} {}", - key, - value, - (double) value * 100 / total); + key, value, (double) value * 100 / total); if (value == 0) { empty.incrementAndGet(); } @@ -934,7 +945,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @SuppressWarnings("unchecked") - private void checkMapping(String index, String type, + private void checkMapping(String index, String pathDef, String fieldName, Map map, Map fields) { String path = pathDef; @@ -959,18 +970,19 @@ public abstract class AbstractExtendedClient implements ExtendedClient { String fieldType = o instanceof String ? o.toString() : null; // do not recurse into our custom field mapper if (!"standardnumber".equals(fieldType) && !"ref".equals(fieldType)) { - checkMapping(index, type, path, key, child, fields); + checkMapping(index, path, key, child, fields); } } else if ("type".equals(key)) { QueryBuilder filterBuilder = QueryBuilders.existsQuery(path); QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(filterBuilder); - SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); - SearchResponse searchResponse = searchRequestBuilder.setSize(0) - .setIndices(index) - .setTypes(type) - .setQuery(queryBuilder) - .execute() - .actionGet(); + SearchSourceBuilder builder = new SearchSourceBuilder() + .query(queryBuilder) + .size(0); + SearchRequest searchRequest = new SearchRequest() + .indices(index) + .source(builder); + SearchResponse searchResponse = + client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); fields.put(path, searchResponse.getHits().getTotalHits()); } } @@ -978,7 +990,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { private static > Map sortByValue(Map map) { Map result = new LinkedHashMap<>(); - map.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue)) + map.entrySet().stream().sorted(Map.Entry.comparingByValue()) .forEachOrdered(e -> result.put(e.getKey(), e.getValue())); return result; } @@ -1062,6 +1074,42 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } } + private static class FailPruneResult implements IndexPruneResult { + + List candidateIndices; + + List indicesToDelete; + + DeleteIndexResponse response; + + FailPruneResult(List candidateIndices, List indicesToDelete, + DeleteIndexResponse response) { + this.candidateIndices = candidateIndices; + this.indicesToDelete = indicesToDelete; + this.response = response; + } + + @Override + public IndexPruneResult.State getState() { + return IndexPruneResult.State.FAIL; + } + + @Override + public List getCandidateIndices() { + return candidateIndices; + } + + @Override + public List getDeletedIndices() { + return indicesToDelete; + } + + @Override + public boolean isAcknowledged() { + return response.isAcknowledged(); + } + } + private static class NothingToDoPruneResult implements IndexPruneResult { List candidateIndices; diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index ba9150f..e77a55a 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -1,5 +1,8 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; @@ -16,6 +19,8 @@ import java.util.ServiceLoader; @SuppressWarnings("rawtypes") public class ClientBuilder { + private static final Logger logger = LogManager.getLogger(ClientBuilder.class); + private final ElasticsearchClient client; private final Settings.Builder settingsBuilder; @@ -97,6 +102,10 @@ public class ClientBuilder { if (provider == null) { throw new IllegalArgumentException("no provider"); } - return (C) providerMap.get(provider).getExtendedClient().setClient(client).init(settingsBuilder.build()); + Settings settings = settingsBuilder.build(); + logger.log(Level.INFO, "settings = " + settings.toDelimitedString(',')); + return (C) providerMap.get(provider).getExtendedClient() + .setClient(client) + .init(settings); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index dbd7fa6..41c3e8b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -2,9 +2,6 @@ package org.xbib.elx.common; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -12,6 +9,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.BulkController; +import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.ExtendedClient; @@ -33,27 +31,23 @@ public class DefaultBulkController implements BulkController { private final BulkMetric bulkMetric; + private BulkProcessor bulkProcessor; + private final List indexNames; private final Map startBulkRefreshIntervals; private final Map stopBulkRefreshIntervals; - private long maxWaitTime; + private final long maxWaitTime; - private TimeUnit maxWaitTimeUnit; + private final TimeUnit maxWaitTimeUnit; - private BulkProcessor bulkProcessor; + private final AtomicBoolean active; - private BulkListener bulkListener; - - private AtomicBoolean active; - - private boolean enableBulkLogging; - - public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) { + public DefaultBulkController(ExtendedClient client) { this.client = client; - this.bulkMetric = bulkMetric; + this.bulkMetric = new DefaultBulkMetric(); this.indexNames = new ArrayList<>(); this.active = new AtomicBoolean(false); this.startBulkRefreshIntervals = new HashMap<>(); @@ -62,9 +56,14 @@ public class DefaultBulkController implements BulkController { this.maxWaitTimeUnit = TimeUnit.SECONDS; } + @Override + public BulkMetric getBulkMetric() { + return bulkMetric; + } + @Override public Throwable getLastBulkError() { - return bulkListener.getLastBulkError(); + return bulkProcessor.getBulkListener().getLastBulkError(); } @Override @@ -78,22 +77,27 @@ public class DefaultBulkController implements BulkController { ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), "maxVolumePerRequest")); - this.enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), + boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), Parameters.ENABLE_BULK_LOGGING.getValue()); - this.bulkListener = new BulkListener(); + BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) .setFlushInterval(flushIngestInterval) .setBulkSize(maxVolumePerRequest) .build(); - this.active.set(true); if (logger.isInfoEnabled()) { - logger.info("bulk processor set up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + + logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + "flushIngestInterval = {} maxVolumePerRequest = {}, bulk logging = {}", maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest, enableBulkLogging); } + this.active.set(true); + } + + @Override + public void inactivate() { + this.active.set(false); } @Override @@ -118,65 +122,53 @@ public class DefaultBulkController implements BulkController { } @Override - public void index(IndexRequest indexRequest) { + public void bulkIndex(IndexRequest indexRequest) { ensureActiveAndBulk(); - if (!active.get()) { - throw new IllegalStateException("inactive"); - } try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); - } + bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); bulkProcessor.add(indexRequest); } catch (Exception e) { - bulkListener.lastBulkError = e; - active.set(false); if (logger.isErrorEnabled()) { logger.error("bulk add of index failed: " + e.getMessage(), e); } + inactivate(); } } @Override - public void delete(DeleteRequest deleteRequest) { + public void bulkDelete(DeleteRequest deleteRequest) { if (!active.get()) { throw new IllegalStateException("inactive"); } try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); - } + bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); bulkProcessor.add(deleteRequest); } catch (Exception e) { - bulkListener.lastBulkError = e; - active.set(false); if (logger.isErrorEnabled()) { logger.error("bulk add of delete failed: " + e.getMessage(), e); } + inactivate(); } } @Override - public void update(UpdateRequest updateRequest) { + public void bulkUpdate(UpdateRequest updateRequest) { if (!active.get()) { throw new IllegalStateException("inactive"); } try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); - } + bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); bulkProcessor.add(updateRequest); } catch (Exception e) { - bulkListener.lastBulkError = e; - active.set(false); if (logger.isErrorEnabled()) { logger.error("bulk add of update failed: " + e.getMessage(), e); } + inactivate(); } } @Override - public boolean waitForResponses(long timeout, TimeUnit timeUnit) { + public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) { try { return bulkProcessor.awaitFlush(timeout, timeUnit); } catch (InterruptedException e) { @@ -195,7 +187,7 @@ public class DefaultBulkController implements BulkController { @Override public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException { flush(); - if (waitForResponses(timeout, timeUnit)) { + if (waitForBulkResponses(timeout, timeUnit)) { if (indexNames.contains(index)) { Long secs = stopBulkRefreshIntervals.get(index); if (secs != null && secs != 0L) { @@ -217,6 +209,7 @@ public class DefaultBulkController implements BulkController { @Override public void close() throws IOException { flush(); + bulkMetric.close(); if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) { for (String index : indexNames) { Long secs = stopBulkRefreshIntervals.get(index); @@ -238,92 +231,5 @@ public class DefaultBulkController implements BulkController { if (bulkProcessor == null) { throw new UnsupportedOperationException("bulk processor not present"); } - if (bulkListener == null) { - throw new UnsupportedOperationException("bulk listener not present"); - } - } - - private class BulkListener implements DefaultBulkProcessor.Listener { - - private final Logger logger = LogManager.getLogger(BulkListener.class.getName()); - - private Throwable lastBulkError = null; - - @Override - public void beforeBulk(long executionId, BulkRequest request) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().inc(); - int n = request.numberOfActions(); - bulkMetric.getSubmitted().inc(n); - bulkMetric.getCurrentIngestNumDocs().inc(n); - bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); - } - if (enableBulkLogging && logger.isDebugEnabled()) { - logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", - executionId, - request.numberOfActions(), - request.estimatedSizeInBytes(), - l); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().dec(); - bulkMetric.getSucceeded().inc(response.getItems().length); - } - int n = 0; - for (BulkItemResponse itemResponse : response.getItems()) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); - } - if (itemResponse.isFailed()) { - n++; - if (bulkMetric != null) { - bulkMetric.getSucceeded().dec(1); - bulkMetric.getFailed().inc(1); - } - } - } - if (enableBulkLogging && bulkMetric != null && logger.isDebugEnabled()) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", - executionId, - bulkMetric.getSucceeded().getCount(), - bulkMetric.getFailed().getCount(), - response.getTook().millis(), - l); - } - if (n > 0) { - if (enableBulkLogging && logger.isErrorEnabled()) { - logger.error("bulk [{}] failed with {} failed items, failure message = {}", - executionId, n, response.buildFailureMessage()); - } - } else { - if (bulkMetric != null) { - bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); - } - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(); - } - lastBulkError = failure; - active.set(false); - if (enableBulkLogging && logger.isErrorEnabled()) { - logger.error("after bulk [" + executionId + "] error", failure); - } - } - - Throwable getLastBulkError() { - return lastBulkError; - } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java new file mode 100644 index 0000000..bb40ba5 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -0,0 +1,109 @@ +package org.xbib.elx.common; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.xbib.elx.api.BulkController; +import org.xbib.elx.api.BulkListener; +import org.xbib.elx.api.BulkMetric; + +class DefaultBulkListener implements BulkListener { + + private final Logger logger = LogManager.getLogger(BulkListener.class.getName()); + + private final BulkController bulkController; + + private final BulkMetric bulkMetric; + + private final boolean isBulkLoggingEnabled; + + private Throwable lastBulkError = null; + + public DefaultBulkListener(BulkController bulkController, + BulkMetric bulkMetric, + boolean isBulkLoggingEnabled) { + this.bulkController = bulkController; + this.bulkMetric = bulkMetric; + this.isBulkLoggingEnabled = isBulkLoggingEnabled; + } + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + long l = 0; + if (bulkMetric != null) { + l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().inc(); + int n = request.numberOfActions(); + bulkMetric.getSubmitted().inc(n); + bulkMetric.getCurrentIngestNumDocs().inc(n); + bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); + } + if (isBulkLoggingEnabled && logger.isDebugEnabled()) { + logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", + executionId, + request.numberOfActions(), + request.estimatedSizeInBytes(), + l); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + long l = 0; + if (bulkMetric != null) { + l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().dec(); + bulkMetric.getSucceeded().inc(response.getItems().length); + } + int n = 0; + for (BulkItemResponse itemResponse : response.getItems()) { + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); + } + if (itemResponse.isFailed()) { + n++; + if (bulkMetric != null) { + bulkMetric.getSucceeded().dec(1); + bulkMetric.getFailed().inc(1); + } + } + } + if (isBulkLoggingEnabled && bulkMetric != null && logger.isDebugEnabled()) { + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", + executionId, + bulkMetric.getSucceeded().getCount(), + bulkMetric.getFailed().getCount(), + response.getTook().millis(), + l); + } + if (n > 0) { + if (isBulkLoggingEnabled && logger.isErrorEnabled()) { + logger.error("bulk [{}] failed with {} failed items, failure message = {}", + executionId, n, response.buildFailureMessage()); + } + } else { + if (bulkMetric != null) { + bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); + } + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(); + } + lastBulkError = failure; + if (logger.isErrorEnabled()) { + logger.error("after bulk [" + executionId + "] error", failure); + } + bulkController.inactivate(); + } + + @Override + public Throwable getLastBulkError() { + return lastBulkError; + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 8127e29..1350e65 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -1,6 +1,5 @@ package org.xbib.elx.common; -import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.BulkMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -37,10 +36,6 @@ public class DefaultBulkMetric implements BulkMetric { submitted = new CountMetric(); succeeded = new CountMetric(); failed = new CountMetric(); - } - - @Override - public void init(Settings settings) { start(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 74915c9..e815b83 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -11,7 +11,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkProcessor; +import org.xbib.elx.api.BulkRequestHandler; import java.util.Objects; import java.util.concurrent.Executors; @@ -29,6 +31,8 @@ import java.util.concurrent.atomic.AtomicLong; */ public class DefaultBulkProcessor implements BulkProcessor { + private final BulkListener bulkListener; + private final int bulkActions; private final long bulkSize; @@ -45,16 +49,22 @@ public class DefaultBulkProcessor implements BulkProcessor { private volatile boolean closed; - private DefaultBulkProcessor(ElasticsearchClient client, Listener listener, String name, int concurrentRequests, - int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { + private DefaultBulkProcessor(ElasticsearchClient client, + BulkListener bulkListener, + String name, + int concurrentRequests, + int bulkActions, + ByteSizeValue bulkSize, + TimeValue flushInterval) { + this.bulkListener = bulkListener; this.executionIdGen = new AtomicLong(); this.closed = false; this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); this.bulkRequest = new BulkRequest(); this.bulkRequestHandler = concurrentRequests == 0 ? - new SyncBulkRequestHandler(client, listener) : - new AsyncBulkRequestHandler(client, listener, concurrentRequests); + new SyncBulkRequestHandler(client, bulkListener) : + new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests); if (flushInterval != null) { this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(name != null ? "[" + name + "]" : "" + "bulk_processor")); @@ -68,12 +78,18 @@ public class DefaultBulkProcessor implements BulkProcessor { } } - public static Builder builder(ElasticsearchClient client, Listener listener) { + public static Builder builder(ElasticsearchClient client, + BulkListener listener) { Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null"); Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null"); return new Builder(client, listener); } + @Override + public BulkListener getBulkListener() { + return bulkListener; + } + /** * Wait for bulk request handler with flush. * @param timeout the timeout value @@ -136,20 +152,7 @@ public class DefaultBulkProcessor implements BulkProcessor { @SuppressWarnings("rawtypes") @Override public DefaultBulkProcessor add(ActionRequest request) { - return add(request, null); - } - - /** - * Adds either a delete or an index request with a payload. - * - * @param request request - * @param payload payload - * @return his bulk processor - */ - @SuppressWarnings("rawtypes") - @Override - public DefaultBulkProcessor add(ActionRequest request, Object payload) { - internalAdd(request, payload); + internalAdd(request); return this; } @@ -183,9 +186,9 @@ public class DefaultBulkProcessor implements BulkProcessor { } } - private synchronized void internalAdd(ActionRequest request, Object payload) { + private synchronized void internalAdd(ActionRequest request) { ensureOpen(); - bulkRequest.add(request, payload); + bulkRequest.add(request); executeIfNeeded(); } @@ -217,7 +220,7 @@ public class DefaultBulkProcessor implements BulkProcessor { private final ElasticsearchClient client; - private final Listener listener; + private final BulkListener bulkListener; private String name; @@ -233,12 +236,12 @@ public class DefaultBulkProcessor implements BulkProcessor { * Creates a builder of bulk processor with the client to use and the listener that will be used * to be notified on the completion of bulk requests. * - * @param client the client - * @param listener the listener + * @param client the client + * @param bulkListener the listener */ - Builder(ElasticsearchClient client, Listener listener) { + Builder(ElasticsearchClient client, BulkListener bulkListener) { this.client = client; - this.listener = listener; + this.bulkListener = bulkListener; } /** @@ -308,7 +311,7 @@ public class DefaultBulkProcessor implements BulkProcessor { * @return a bulk processor */ public DefaultBulkProcessor build() { - return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval); + return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval); } } @@ -332,10 +335,9 @@ public class DefaultBulkProcessor implements BulkProcessor { private final ElasticsearchClient client; - private final DefaultBulkProcessor.Listener listener; + private final BulkListener listener; - SyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener) { - Objects.requireNonNull(listener, "A listener is required for SyncBulkRequestHandler but null"); + SyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener) { this.client = client; this.listener = listener; } @@ -365,14 +367,13 @@ public class DefaultBulkProcessor implements BulkProcessor { private final ElasticsearchClient client; - private final DefaultBulkProcessor.Listener listener; + private final BulkListener listener; private final Semaphore semaphore; private final int concurrentRequests; - private AsyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener, int concurrentRequests) { - Objects.requireNonNull(listener, "A listener is required for AsyncBulkRequestHandler but null"); + private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener, int concurrentRequests) { this.client = client; this.listener = listener; this.concurrentRequests = concurrentRequests; @@ -413,7 +414,8 @@ public class DefaultBulkProcessor implements BulkProcessor { } catch (Exception e) { listener.afterBulk(executionId, bulkRequest, e); } finally { - if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore + if (!bulkRequestSetupSuccessful && acquired) { + // if we fail on client.bulk() release the semaphore semaphore.release(); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java index 4f91c14..647894b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java @@ -29,7 +29,7 @@ public class MockExtendedClient extends AbstractExtendedClient { } @Override - protected void closeClient() { + protected void closeClient() { } @Override diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java index 928dbe6..5c559bb 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java @@ -52,7 +52,8 @@ class AliasTest { // get alias GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY); long t0 = System.nanoTime(); - GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); + GetAliasesResponse getAliasesResponse = + client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); long t1 = (System.nanoTime() - t0) / 1000000; logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1); assertTrue(t1 >= 0); diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java index 2b82483..2bc1f44 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java @@ -33,7 +33,7 @@ class SearchTest { ElasticsearchClient client = helper.client("1"); BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE); for (int i = 0; i < 1; i++) { - IndexRequest indexRequest = new IndexRequest("pages", "row") + IndexRequest indexRequest = new IndexRequest().index("pages").type("row") .source(XContentFactory.jsonBuilder() .startObject() .field("user1", "joerg") diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java index 1ee6bfb..cb56250 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java @@ -30,7 +30,7 @@ class SimpleTest { } @Test - void test() throws Exception { + void testSimple() throws Exception { try { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices("test"); @@ -39,7 +39,6 @@ class SimpleTest { // ignore if index not found } Settings indexSettings = Settings.settingsBuilder() - .put(helper.getNodeSettings()) .put("index.analysis.analyzer.default.filter.0", "lowercase") .put("index.analysis.analyzer.default.filter.1", "trim") .put("index.analysis.analyzer.default.tokenizer", "keyword") diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java index 6186f8c..139f560 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java @@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.HashMap; import java.util.Map; import java.util.Random; - -import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import java.util.concurrent.atomic.AtomicInteger; public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { @@ -47,20 +46,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); - private Map nodes = new HashMap<>(); - - private Map clients = new HashMap<>(); - - private String home; - - private String cluster; - - private String host; - - private int port; - private static final String key = "es-instance"; + private static final AtomicInteger count = new AtomicInteger(0); + private static final ExtensionContext.Namespace ns = ExtensionContext.Namespace.create(TestExtension.class); @@ -73,20 +62,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - setHome(System.getProperty("path.home") + "/" + getRandomString(8)); - setClusterName("test-cluster-" + System.getProperty("user.name")); - return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create()); + // initialize new helper here, increase counter + return extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class); } @Override - public void beforeEach(ExtensionContext context) throws Exception { - Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); - logger.info("starting cluster"); + public void beforeEach(ExtensionContext extensionContext) throws Exception { + Helper helper = extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode("1"); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); Object obj = response.iterator().next().getTransport().getAddress() .publishAddress(); + String host = null; + int port = 0; if (obj instanceof InetSocketTransportAddress) { InetSocketTransportAddress address = (InetSocketTransportAddress) obj; host = address.address().getHostName(); @@ -111,42 +103,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } @Override - public void afterEach(ExtensionContext context) throws Exception { - closeNodes(); - deleteFiles(Paths.get(getHome() + "/data")); + public void afterEach(ExtensionContext extensionContext) throws Exception { + Helper helper = extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + closeNodes(helper); + deleteFiles(Paths.get(helper.getHome() + "/data")); logger.info("data files wiped"); Thread.sleep(2000L); // let OS commit changes } - private void setClusterName(String cluster) { - this.cluster = cluster; - } - - private String getClusterName() { - return cluster; - } - - private void setHome(String home) { - this.home = home; - } - - private String getHome() { - return home; - } - - private void closeNodes() { + private void closeNodes(Helper helper) throws IOException { logger.info("closing all clients"); - for (AbstractClient client : clients.values()) { + for (AbstractClient client : helper.clients.values()) { client.close(); } - clients.clear(); logger.info("closing all nodes"); - for (Node node : nodes.values()) { + for (Node node : helper.nodes.values()) { if (node != null) { node.close(); } } - nodes.clear(); logger.info("all nodes closed"); } @@ -168,23 +144,42 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } } - private String getRandomString(int len) { - final char[] buf = new char[len]; - final int n = numbersAndLetters.length - 1; - for (int i = 0; i < buf.length; i++) { - buf[i] = numbersAndLetters[random.nextInt(n)]; - } - return new String(buf); - } - private Helper create() { - return new Helper(); + Helper helper = new Helper(); + helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); + helper.setClusterName("test-cluster-" + helper.randomString(8)); + logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); + return helper; } - class Helper { + static class Helper { + + String home; + + String cluster; + + Map nodes = new HashMap<>(); + + Map clients = new HashMap<>(); + + void setHome(String home) { + this.home = home; + } + + String getHome() { + return home; + } + + void setClusterName(String cluster) { + this.cluster = cluster; + } + + String getClusterName() { + return cluster; + } Settings getNodeSettings() { - return settingsBuilder() + return Settings.builder() .put("cluster.name", getClusterName()) .put("path.home", getHome()) .build(); @@ -198,20 +193,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return clients.get(id); } - String randomString(int n) { - return getRandomString(n); + String randomString(int len) { + final char[] buf = new char[len]; + final int n = numbersAndLetters.length - 1; + for (int i = 0; i < buf.length; i++) { + buf[i] = numbersAndLetters[random.nextInt(n)]; + } + return new String(buf); } private Node buildNode(String id) { - Settings nodeSettings = settingsBuilder() + Settings nodeSettings = Settings.builder() .put(getNodeSettings()) - .put("name", id) + .put("node.name", id) .build(); Node node = new MockNode(nodeSettings); AbstractClient client = (AbstractClient) node.client(); nodes.put(id, node); clients.put(id, client); - logger.info("clients={}", clients); return node; } } diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java index 4f298aa..39beedb 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java @@ -45,18 +45,19 @@ class WildcardTest { } private void index(ElasticsearchClient client, String id, String fieldValue) throws IOException { - client.execute(IndexAction.INSTANCE, new IndexRequest("index", "type", id) - .source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())).actionGet(); + client.execute(IndexAction.INSTANCE, new IndexRequest().index("index").type("type").id(id) + .source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())) + .actionGet(); client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet(); } private long count(ElasticsearchClient client, QueryBuilder queryBuilder) { - SearchSourceBuilder builder = new SearchSourceBuilder(); - builder.query(queryBuilder); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices("index"); - searchRequest.types("type"); - searchRequest.source(builder); + SearchSourceBuilder builder = new SearchSourceBuilder() + .query(queryBuilder); + SearchRequest searchRequest = new SearchRequest() + .indices("index") + .types("type") + .source(builder); return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits(); } diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java b/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java index 5a27aff..ab5f171 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java @@ -1 +1 @@ -package org.xbib.elx.common.test; \ No newline at end of file +package org.xbib.elx.common.test; diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java index f922bcc..3733275 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/ClientTest.java @@ -94,7 +94,7 @@ class ClientTest { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - assertEquals(1, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } @@ -122,7 +122,7 @@ class ClientTest { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } @@ -180,7 +180,7 @@ class ClientTest { logger.warn("skipping, no node available"); } finally { client.stopBulk("test5", 60L, TimeUnit.SECONDS); - assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(maxthreads * actions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index 0ea421c..f2a1689 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -63,7 +63,7 @@ class DuplicateIDTest { logger.warn("skipping, no node available"); } finally { client.close(); - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index c9a7cd7..c0bab0d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -37,7 +37,7 @@ class SmokeTest { client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest client.flush(); client.waitForResponses(30, TimeUnit.SECONDS); - assertEquals(helper.getCluster(), client.getClusterName()); + assertEquals(helper.getClusterName(), client.getClusterName()); client.checkMapping("test_smoke"); client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); client.flush(); @@ -54,8 +54,8 @@ class SmokeTest { int replica = client.getReplicaLevel(indexDefinition); assertEquals(2, replica); client.deleteIndex(indexDefinition); - assertEquals(0, client.getBulkMetric().getFailed().getCount()); - assertEquals(4, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(0, client.getBulkController().getBulkMetric().getFailed().getCount()); + assertEquals(4, client.getBulkController().getBulkMetric().getSucceeded().getCount()); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index dcd0e31..e394c9f 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.HashMap; import java.util.Map; import java.util.Random; - -import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import java.util.concurrent.atomic.AtomicInteger; public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { @@ -47,20 +46,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); - private Map nodes = new HashMap<>(); - - private Map clients = new HashMap<>(); - - private String home; - - private String cluster; - - private String host; - - private int port; - private static final String key = "es-instance"; + private static final AtomicInteger count = new AtomicInteger(0); + private static final ExtensionContext.Namespace ns = ExtensionContext.Namespace.create(TestExtension.class); @@ -73,20 +62,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - setHome(System.getProperty("path.home") + "/" + getRandomString(8)); - setClusterName("test-cluster-" + System.getProperty("user.name")); - return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create()); + // initialize new helper here, increase counter + return extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class); } @Override - public void beforeEach(ExtensionContext context) throws Exception { - Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); - logger.info("starting cluster"); + public void beforeEach(ExtensionContext extensionContext) throws Exception { + Helper helper = extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode("1"); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); Object obj = response.iterator().next().getTransport().getAddress() .publishAddress(); + String host = null; + int port = 0; if (obj instanceof InetSocketTransportAddress) { InetSocketTransportAddress address = (InetSocketTransportAddress) obj; host = address.address().getHostName(); @@ -111,42 +103,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } @Override - public void afterEach(ExtensionContext context) throws Exception { - closeNodes(); - deleteFiles(Paths.get(getHome() + "/data")); + public void afterEach(ExtensionContext extensionContext) throws Exception { + Helper helper = extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + closeNodes(helper); + deleteFiles(Paths.get(helper.getHome() + "/data")); logger.info("data files wiped"); Thread.sleep(2000L); // let OS commit changes } - private void setClusterName(String cluster) { - this.cluster = cluster; - } - - private String getClusterName() { - return cluster; - } - - private void setHome(String home) { - this.home = home; - } - - private String getHome() { - return home; - } - - private void closeNodes() { + private void closeNodes(Helper helper) throws IOException { logger.info("closing all clients"); - for (AbstractClient client : clients.values()) { + for (AbstractClient client : helper.clients.values()) { client.close(); } - clients.clear(); logger.info("closing all nodes"); - for (Node node : nodes.values()) { + for (Node node : helper.nodes.values()) { if (node != null) { node.close(); } } - nodes.clear(); logger.info("all nodes closed"); } @@ -168,20 +144,46 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } } - private String getRandomString(int len) { - final char[] buf = new char[len]; - final int n = numbersAndLetters.length - 1; - for (int i = 0; i < buf.length; i++) { - buf[i] = numbersAndLetters[random.nextInt(n)]; - } - return new String(buf); - } - private Helper create() { - return new Helper(); + Helper helper = new Helper(); + helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); + helper.setClusterName("test-cluster-" + helper.randomString(8)); + logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); + return helper; } - class Helper { + static class Helper { + + String home; + + String cluster; + + Map nodes = new HashMap<>(); + + Map clients = new HashMap<>(); + + void setHome(String home) { + this.home = home; + } + + String getHome() { + return home; + } + + void setClusterName(String cluster) { + this.cluster = cluster; + } + + String getClusterName() { + return cluster; + } + + Settings getNodeSettings() { + return Settings.builder() + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .build(); + } void startNode(String id) { buildNode(id).start(); @@ -191,25 +193,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return clients.get(id); } - String getCluster() { - return getClusterName(); - } - - String randomString(int n) { - return getRandomString(n); + String randomString(int len) { + final char[] buf = new char[len]; + final int n = numbersAndLetters.length - 1; + for (int i = 0; i < buf.length; i++) { + buf[i] = numbersAndLetters[random.nextInt(n)]; + } + return new String(buf); } private Node buildNode(String id) { - Settings nodeSettings = settingsBuilder() - .put("cluster.name", getClusterName()) - .put("path.home", getHome()) - .put("name", id) + Settings nodeSettings = Settings.builder() + .put(getNodeSettings()) + .put("node.name", id) .build(); Node node = new MockNode(nodeSettings); AbstractClient client = (AbstractClient) node.client(); nodes.put(id, node); clients.put(id, client); - logger.info("clients={}", clients); return node; } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java index 73c0f81..409f38f 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/ClientTest.java @@ -97,7 +97,7 @@ class ClientTest { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - assertEquals(1, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } @@ -126,7 +126,7 @@ class ClientTest { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } @@ -183,7 +183,7 @@ class ClientTest { logger.warn("skipping, no node available"); } finally { client.stopBulk("test5", 60L, TimeUnit.SECONDS); - assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(maxthreads * maxloop, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 64207e7..37149f6 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -64,7 +64,7 @@ class DuplicateIDTest { logger.warn("skipping, no node available"); } finally { client.close(); - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index be4a87b..74d8dee 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -38,7 +38,7 @@ class SmokeTest extends TestExtension { client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest client.flush(); client.waitForResponses(30, TimeUnit.SECONDS); - assertEquals(helper.getCluster(), client.getClusterName()); + assertEquals(helper.getClusterName(), client.getClusterName()); client.checkMapping("test_smoke"); client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); client.flush(); @@ -56,8 +56,8 @@ class SmokeTest extends TestExtension { assertEquals(2, replica); client.deleteIndex(indexDefinition); - assertEquals(0, client.getBulkMetric().getFailed().getCount()); - assertEquals(4, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(0, client.getBulkController().getBulkMetric().getFailed().getCount()); + assertEquals(4, client.getBulkController().getBulkMetric().getSucceeded().getCount()); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index e37ca8c..7867e42 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.HashMap; import java.util.Map; import java.util.Random; - -import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import java.util.concurrent.atomic.AtomicInteger; public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { @@ -47,19 +46,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); - private Map nodes = new HashMap<>(); + private static final String key = "es-instance"; - private Map clients = new HashMap<>(); - - private String home; - - private String cluster; - - private String host; - - private int port; - - private static final String key = "es-test-instance"; + private static final AtomicInteger count = new AtomicInteger(0); private static final ExtensionContext.Namespace ns = ExtensionContext.Namespace.create(TestExtension.class); @@ -73,15 +62,16 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - setHome(System.getProperty("path.home") + "/" + getRandomString(8)); - setClusterName("test-cluster-" + System.getProperty("user.name")); - return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); + // initialize new helper here, increase counter + return extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class); } @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - Helper helper = extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class); - logger.info("starting cluster"); + Helper helper = extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode("1"); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); @@ -89,8 +79,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .publishAddress(); if (obj instanceof InetSocketTransportAddress) { InetSocketTransportAddress address = (InetSocketTransportAddress) obj; - host = address.address().getHostName(); - port = address.address().getPort(); + helper.host = address.address().getHostName(); + helper.port = address.address().getPort(); } try { ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, @@ -107,46 +97,30 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft ClusterStateResponse clusterStateResponse = helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); - logger.info("host = {} port = {}", host, port); + logger.info("host = {} port = {}", helper.host, helper.port); } @Override - public void afterEach(ExtensionContext context) throws Exception { - closeNodes(); - deleteFiles(Paths.get(getHome() + "/data")); - logger.info("data files wiped: " + getHome()); + public void afterEach(ExtensionContext extensionContext) throws Exception { + Helper helper = extensionContext.getParent().get().getStore(ns) + .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); + closeNodes(helper); + deleteFiles(Paths.get(helper.getHome() + "/data")); + logger.info("data files wiped"); Thread.sleep(2000L); // let OS commit changes } - private void setClusterName(String cluster) { - this.cluster = cluster; - } - - private String getClusterName() { - return cluster; - } - - private void setHome(String home) { - this.home = home; - } - - private String getHome() { - return home; - } - - private void closeNodes() { + private void closeNodes(Helper helper) throws IOException { logger.info("closing all clients"); - for (AbstractClient client : clients.values()) { + for (AbstractClient client : helper.clients.values()) { client.close(); } - clients.clear(); logger.info("closing all nodes"); - for (Node node : nodes.values()) { + for (Node node : helper.nodes.values()) { if (node != null) { node.close(); } } - nodes.clear(); logger.info("all nodes closed"); } @@ -168,34 +142,57 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } } - private String getRandomString(int len) { - final char[] buf = new char[len]; - final int n = numbersAndLetters.length - 1; - for (int i = 0; i < buf.length; i++) { - buf[i] = numbersAndLetters[random.nextInt(n)]; - } - return new String(buf); - } - private Helper create() { - return new Helper(); + Helper helper = new Helper(); + helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8)); + helper.setClusterName("test-cluster-" + helper.randomString(8)); + logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome()); + return helper; } - class Helper { + static class Helper { + + String home; + + String cluster; + + String host; + + int port; + + Map nodes = new HashMap<>(); + + Map clients = new HashMap<>(); + + void setHome(String home) { + this.home = home; + } + + String getHome() { + return home; + } + + void setClusterName(String cluster) { + this.cluster = cluster; + } + + String getClusterName() { + return cluster; + } Settings getNodeSettings() { - return settingsBuilder() + return Settings.builder() .put("cluster.name", getClusterName()) .put("path.home", getHome()) .build(); } Settings getTransportSettings() { - return settingsBuilder() + return Settings.builder() + .put("cluster.name", cluster) + .put("path.home", getHome()) .put("host", host) .put("port", port) - .put("cluster.name", getClusterName()) - .put("path.home", getHome() + "/transport") .build(); } @@ -207,24 +204,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return clients.get(id); } - String getCluster() { - return getClusterName(); - } - - String randomString(int n) { - return getRandomString(n); + String randomString(int len) { + final char[] buf = new char[len]; + final int n = numbersAndLetters.length - 1; + for (int i = 0; i < buf.length; i++) { + buf[i] = numbersAndLetters[random.nextInt(n)]; + } + return new String(buf); } private Node buildNode(String id) { - Settings nodeSettings = settingsBuilder() + Settings nodeSettings = Settings.builder() .put(getNodeSettings()) - .put("name", id) + .put("node.name", id) .build(); Node node = new MockNode(nodeSettings); AbstractClient client = (AbstractClient) node.client(); nodes.put(id, node); clients.put(id, client); - logger.info("clients={}", clients); return node; } } diff --git a/gradle.properties b/gradle.properties index 4b70d38..4ed757c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.15 +version = 2.2.1.16 # main xbib-metrics.version = 2.0.0 @@ -8,7 +8,7 @@ xbib-metrics.version = 2.0.0 xbib-guice.version = 4.4.2 # guava 18 -> our guava 28.1 xbib-guava.version = 28.1 -xbib-netty-http.version = 4.1.48.0 +xbib-netty-http.version = 4.1.49.0 elasticsearch.version = 2.2.1 #jackson 2.6.7 (original ES version) -> 2.9.10 jackson.version = 2.9.10 @@ -17,8 +17,5 @@ log4j.version = 2.12.1 mustache.version = 0.9.5 jts.version = 1.13 -# test -junit.version = 5.5.1 - -# docs +junit.version = 5.6.2 asciidoclet.version = 1.5.4