From 4701447d3c1e9c0bfa06b2d1e0ad4fe4c16b1a7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 19 Feb 2019 17:29:51 +0100 Subject: [PATCH] clean up extended client --- build.gradle | 2 +- .../java/org/xbib/elx/api/BulkControl.java | 2 + .../java/org/xbib/elx/api/ExtendedClient.java | 359 +++++---- .../org/xbib/elx/api}/IndexDefinition.java | 28 +- .../org/xbib/elx/api}/IndexRetention.java | 6 +- .../elx/common/AbstractExtendedClient.java | 733 ++++++++---------- .../org/xbib/elx/common/BulkProcessor.java | 42 +- .../xbib/elx/common/MockExtendedClient.java | 65 +- .../java/org/xbib/elx/common/Parameters.java | 2 +- .../xbib/elx/common/SimpleBulkControl.java | 32 +- .../elx/common/management/package-info.java | 1 - .../java/org/xbib/elx/common/AliasTest.java | 22 +- .../java/org/elasticsearch/node/MockNode.java | 4 - .../elx/node/ExtendeNodeDuplicateIDTest.java | 2 +- .../ExtendedNodeClientSingleNodeTest.java | 39 - .../xbib/elx/node/ExtendedNodeClientTest.java | 28 +- .../elx/node/ExtendedNodeIndexAliasTest.java | 8 +- .../elx/node/ExtendedNodeReplicaTest.java | 9 +- .../xbib/elx/node/ExtendedNodeSmokeTest.java | 69 ++ .../ExtendedNodeUpdateReplicaLevelTest.java | 10 +- .../java/org/xbib/elx/node/NodeTestUtils.java | 14 +- .../transport/ExtendedTransportClient.java | 26 +- ...ExtendedTransportClientSingleNodeTest.java | 4 +- .../ExtendedTransportClientTest.java | 63 +- .../ExtendedTransportDuplicateIDTest.java | 2 +- .../ExtendedTransportIndexAliasTest.java | 35 +- .../ExtendedTransportReplicaTest.java | 24 +- ...tendedTransportUpdateReplicaLevelTest.java | 10 +- gradle.properties | 2 +- gradle/sonarqube.gradle | 41 - 30 files changed, 882 insertions(+), 802 deletions(-) rename {elx-common/src/main/java/org/xbib/elx/common/management => elx-api/src/main/java/org/xbib/elx/api}/IndexDefinition.java (79%) rename {elx-common/src/main/java/org/xbib/elx/common/management => elx-api/src/main/java/org/xbib/elx/api}/IndexRetention.java (73%) delete mode 100644 elx-common/src/main/java/org/xbib/elx/common/management/package-info.java delete mode 100644 elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientSingleNodeTest.java create mode 100644 elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeSmokeTest.java delete mode 100644 gradle/sonarqube.gradle diff --git a/build.gradle b/build.gradle index 01e2104..090181a 100644 --- a/build.gradle +++ b/build.gradle @@ -67,7 +67,7 @@ subprojects { } clean { - delete "plugins" + delete "data" delete "logs" delete "out" } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkControl.java b/elx-api/src/main/java/org/xbib/elx/api/BulkControl.java index e0fcf84..c43c137 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkControl.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkControl.java @@ -16,4 +16,6 @@ public interface BulkControl { Map getStartBulkRefreshIntervals(); Map getStopBulkRefreshIntervals(); + + String getMaxWaitTime(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java index 4ba2496..9c2f0e1 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java @@ -10,17 +10,16 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; /** - * Interface for providing extended administrative methods for managing and ingesting Elasticsearch. + * Interface for extended managing and indexing methods of an Elasticsearch client. */ public interface ExtendedClient { /** * Set an Elasticsearch client to extend from it. May be null for TransportClient. * @param client client - * @return an ELasticsearch client + * @return this client */ ExtendedClient setClient(ElasticsearchClient client); @@ -31,75 +30,101 @@ public interface ExtendedClient { */ ElasticsearchClient getClient(); + /** + * Initiative the extended client, cerates instances and connect to cluster, if required. + * + * @param settings settings + * @return this client + * @throws IOException if init fails + */ + ExtendedClient init(Settings settings) throws IOException; + + /** + * Set bulk metric. + * @param bulkMetric the bulk metric + * @return this client + */ ExtendedClient setBulkMetric(BulkMetric bulkMetric); + /** + * Get bulk metric. + * @return the bulk metric + */ BulkMetric getBulkMetric(); + /** + * Set bulk control. + * @param bulkControl the bulk control + * @return this + */ ExtendedClient setBulkControl(BulkControl bulkControl); + /** + * Get buulk control. + * @return the bulk control + */ BulkControl getBulkControl(); /** - * Create new Elasticsearch client, wrap an existing Elasticsearch client. + * Build index definition from settings. * - * @param settings settings - * @return this client - * @throws IOException if init fails + * @param index the index name + * @param settings the settings for the index + * @return index definition + * @throws IOException if settings/mapping URL is invalid/malformed */ - ExtendedClient init(Settings settings) throws IOException; + IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException; /** - * Bulked index request. Each request will be added to a queue for bulking requests. - * Submitting request will be done when bulk limits are exceeded. + * Add index request. Each request will be added to a queue for bulking requests. + * Submitting request will be done when limits are exceeded. * * @param index the index - * @param type the type * @param id the id * @param create true if document must be created * @param source the source * @return this */ - ExtendedClient index(String index, String type, String id, boolean create, BytesReference source); + ExtendedClient index(String index, String id, boolean create, BytesReference source); /** - * Index document. + * Index request. Each request will be added to a queue for bulking requests. + * Submitting request will be done when limits are exceeded. * * @param index the index - * @param type the type * @param id the id * @param create true if document is to be created, false otherwise * @param source the source * @return this client methods */ - ExtendedClient index(String index, String type, String id, boolean create, String source); + ExtendedClient index(String index, String id, boolean create, String source); /** - * Bulked index request. Each request will be added to a queue for bulking requests. + * Index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when bulk limits are exceeded. * * @param indexRequest the index request to add - * @return this ingest + * @return this */ - ExtendedClient indexRequest(IndexRequest indexRequest); + ExtendedClient index(IndexRequest indexRequest); /** - * Delete document. + * Delete request. * * @param index the index - * @param type the type * @param id the id - * @return this ingest + * @return this */ - ExtendedClient delete(String index, String type, String id); + ExtendedClient delete(String index, String id); /** - * Bulked delete request. Each request will be added to a queue for bulking requests. + * Delete request. Each request will be added to a queue for bulking requests. * Submitting request will be done when bulk limits are exceeded. * * @param deleteRequest the delete request to add - * @return this ingest + * @return this */ - ExtendedClient deleteRequest(DeleteRequest deleteRequest); + ExtendedClient delete(DeleteRequest deleteRequest); /** * Bulked update request. Each request will be added to a queue for bulking requests. @@ -107,23 +132,21 @@ public interface ExtendedClient { * Note that updates only work correctly when all operations between nodes are synchronized. * * @param index the index - * @param type the type * @param id the id * @param source the source * @return this */ - ExtendedClient update(String index, String type, String id, BytesReference source); + ExtendedClient update(String index, String id, BytesReference source); /** * Update document. Use with precaution! Does not work in all cases. * * @param index the index - * @param type the type * @param id the id * @param source the source * @return this */ - ExtendedClient update(String index, String type, String id, String source); + ExtendedClient update(String index, String id, String source); /** * Bulked update request. Each request will be added to a queue for bulking requests. @@ -131,207 +154,225 @@ public interface ExtendedClient { * Note that updates only work correctly when all operations between nodes are synchronized. * * @param updateRequest the update request to add - * @return this ingest - */ - ExtendedClient updateRequest(UpdateRequest updateRequest); - - /** - * Set the maximum number of actions per request. - * - * @param maxActionsPerRequest maximum number of actions per request - * @return this ingest - */ - ExtendedClient maxActionsPerRequest(int maxActionsPerRequest); - - /** - * Set the maximum concurent requests. - * - * @param maxConcurentRequests maximum number of concurrent ingest requests - * @return this Ingest - */ - ExtendedClient maxConcurrentRequests(int maxConcurentRequests); - - /** - * Set the maximum volume for request before flush. - * - * @param maxVolume maximum volume - * @return this ingest - */ - ExtendedClient maxVolumePerRequest(String maxVolume); - - /** - * Set the flush interval for automatic flushing outstanding ingest requests. - * - * @param flushInterval the flush interval, default is 30 seconds - * @return this ingest - */ - ExtendedClient flushIngestInterval(String flushInterval); - - /** - * Set mapping. - * - * @param type mapping type - * @param in mapping definition as input stream - * @throws IOException if mapping could not be added + * @return this */ - void mapping(String type, InputStream in) throws IOException; + ExtendedClient update(UpdateRequest updateRequest); /** - * Set mapping. + * Create a new index. * - * @param type mapping type - * @param mapping mapping definition as input stream - * @throws IOException if mapping could not be added + * @param index index + * @return this + * @throws IOException if new index creation fails */ - void mapping(String type, String mapping) throws IOException; + ExtendedClient newIndex(String index) throws IOException; /** - * Put mapping. + * Create a new index. * * @param index index + * @param settings settings + * @param mapping mapping + * @return this + * @throws IOException if settings/mapping is invalid or index creation fails */ - void putMapping(String index); + ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException; /** * Create a new index. * * @param index index - * @return this ingest + * @param settings settings + * @param mapping mapping + * @return this + * @throws IOException if settings/mapping is invalid or index creation fails */ - ExtendedClient newIndex(String index); + ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException; /** * Create a new index. * - * @param index index - * @param type type + * @param index index * @param settings settings - * @param mappings mappings - * @return this ingest - * @throws IOException if new index creation fails + * @param mapping mapping + * @return this + * @throws IOException if settings/mapping is invalid or index creation fails */ - ExtendedClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException; + ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException; /** * Create a new index. - * - * @param index index - * @param settings settings - * @param mappings mappings - * @return this ingest + * @param indexDefinition the index definition + * @return this + * @throws IOException if settings/mapping is invalid or index creation fails */ - ExtendedClient newIndex(String index, Settings settings, Map mappings); + ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException; /** - * Create new mapping. - * - * @param index index - * @param type index type - * @param mapping mapping - * @return this ingest + * Delete an index. + * @param indexDefinition the index definition + * @return this */ - ExtendedClient newMapping(String index, String type, Map mapping); + ExtendedClient deleteIndex(IndexDefinition indexDefinition); /** - * Delete index. + * Delete an index. * * @param index index - * @return this ingest + * @return this */ ExtendedClient deleteIndex(String index); + /** + * Start bulk mode for indexes. + * @param indexDefinition index definition + * @return this + * @throws IOException if bulk could not be started + */ + ExtendedClient startBulk(IndexDefinition indexDefinition) throws IOException; + /** * Start bulk mode. * - * @param index index + * @param index index * @param startRefreshIntervalSeconds refresh interval before bulk * @param stopRefreshIntervalSeconds refresh interval after bulk - * @return this ingest + * @return this * @throws IOException if bulk could not be started */ ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException; + /** + * Stop bulk mode. + * + * @param indexDefinition index definition + * @return this + * @throws IOException if bulk could not be startet + */ + ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException; + /** * Stops bulk mode. * * @param index index - * @return this Ingest + * @param maxWaitTime maximum wait time + * @return this * @throws IOException if bulk could not be stopped */ - ExtendedClient stopBulk(String index) throws IOException; + ExtendedClient stopBulk(String index, String maxWaitTime) throws IOException; /** - * Flush ingest, move all pending documents to the cluster. + * Flush bulk indexing, move all pending documents to the cluster. * * @return this */ ExtendedClient flushIngest(); /** - * Wait for all outstanding responses. + * Update replica level. + * @param indexDefinition the index definition + * @param level the replica level + * @return this + * @throws IOException if replica setting could not be updated + */ + ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException; + + /** + * Update replica level. * + * @param index index + * @param level the replica level * @param maxWaitTime maximum wait time - * @return this ingest - * @throws InterruptedException if wait is interrupted - * @throws ExecutionException if execution failed + * @return this + * @throws IOException if replica setting could not be updated */ - ExtendedClient waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException; + ExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) throws IOException; + + /** + * Get replica level. + * @param indexDefinition the index name + * @return the replica level of the index + */ + int getReplicaLevel(IndexDefinition indexDefinition); + + /** + * Get replica level. + * @param index the index name + * @return the replica level of the index + */ + int getReplicaLevel(String index); /** * Refresh the index. * * @param index index + * @return this */ - void refreshIndex(String index); + ExtendedClient refreshIndex(String index); /** - * Flush the index. + * Flush the index. The cluster clears cache and completes indexing. * * @param index index + * @return this */ - void flushIndex(String index); + ExtendedClient flushIndex(String index); /** - * Update replica level. + * Force segment merge of an index. + * @param indexDefinition th eindex definition + * @return this + */ + boolean forceMerge(IndexDefinition indexDefinition); + + /** + * Force segment merge of an index. + * @param index the index + * @param maxWaitTime maximum wait time + * @return this + */ + boolean forceMerge(String index, String maxWaitTime); + + /** + * Wait for all outstanding bulk responses. * - * @param index index - * @param level the replica level - * @return number of shards after updating replica level - * @throws IOException if replica could not be updated + * @param maxWaitTime maximum wait time + * @return true if wait succeeded, false if wait timed out */ - int updateReplicaLevel(String index, int level) throws IOException; + boolean waitForResponses(String maxWaitTime); /** * Wait for cluster being healthy. * * @param healthColor cluster health color to wait for - * @param timeValue time value - * @throws IOException if wait failed + * @param maxWaitTime time value + * @return true if wait succeeded, false if wait timed out */ - void waitForCluster(String healthColor, String timeValue) throws IOException; + boolean waitForCluster(String healthColor, String maxWaitTime); /** * Get current health color. * + * @param maxWaitTime maximum wait time * @return the cluster health color */ - String healthColor(); + String getHealthColor(String maxWaitTime); /** * Wait for index recovery (after replica change). * * @param index index - * @return number of shards found - * @throws IOException if wait failed + * @param maxWaitTime maximum wait time + * @return true if wait succeeded, false if wait timed out */ - int waitForRecovery(String index) throws IOException; + boolean waitForRecovery(String index, String maxWaitTime); /** * Resolve alias. * * @param alias the alias - * @return one index name behind the alias or the alias if there is no index + * @return this index name behind the alias or the alias if there is no index */ String resolveAlias(String alias); @@ -347,40 +388,72 @@ public interface ExtendedClient { /** * Get all alias filters. * - * @param index index + * @param alias the alias * @return map of alias filters */ - Map getAliasFilters(String index); + Map getAliasFilters(String alias); /** - * Switch aliases from one index to another. + * Get all index filters. + * @param index the index + * @return map of index filters + */ + Map getIndexFilters(String index); + + /** + * Switch from one index to another. + * @param indexDefinition the index definition + * @param extraAliases new aliases + * @return this + */ + ExtendedClient switchIndex(IndexDefinition indexDefinition, List extraAliases); + + /** + * Switch from one index to another. + * @param indexDefinition the index definition + * @param extraAliases new aliases + * @param indexAliasAdder method to add aliases + * @return this + */ + ExtendedClient switchIndex(IndexDefinition indexDefinition, List extraAliases, IndexAliasAdder indexAliasAdder); + + /** + * Switch from one index to another. * * @param index the index name - * @param concreteIndex the index name with timestamp + * @param fullIndexName the index name with timestamp * @param extraAliases a list of names that should be set as index aliases + * @return this */ - void switchAliases(String index, String concreteIndex, List extraAliases); + ExtendedClient switchIndex(String index, String fullIndexName, List extraAliases); /** - * Switch aliases from one index to another. + * Switch from one index to another. * * @param index the index name - * @param concreteIndex the index name with timestamp + * @param fullIndexName the index name with timestamp * @param extraAliases a list of names that should be set as index aliases * @param adder an adder method to create alias term queries + * @return this + */ + ExtendedClient switchIndex(String index, String fullIndexName, List extraAliases, IndexAliasAdder adder); + + /** + * Prune index. + * @param indexDefinition the index definition */ - void switchAliases(String index, String concreteIndex, List extraAliases, IndexAliasAdder adder); + void pruneIndex(IndexDefinition indexDefinition); /** - * Retention policy for an index. All indices before timestampdiff should be deleted, - * but mintokeep indices must be kept. + * Apply retention policy to prune indices. All indices before delta should be deleted, + * but the number of mintokeep indices must be kept. * * @param index index name - * @param concreteIndex index name with timestamp - * @param timestampdiff timestamp delta (for index timestamps) + * @param fullIndexName index name with timestamp + * @param delta timestamp delta (for index timestamps) * @param mintokeep minimum number of indices to keep */ - void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep); + void pruneIndex(String index, String fullIndexName, int delta, int mintokeep); /** * Find the timestamp of the most recently indexed document in the index. @@ -413,7 +486,7 @@ public interface ExtendedClient { Throwable getThrowable(); /** - * Shutdown the ingesting. + * Shutdown the client. * @throws IOException if shutdown fails */ void shutdown() throws IOException; diff --git a/elx-common/src/main/java/org/xbib/elx/common/management/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java similarity index 79% rename from elx-common/src/main/java/org/xbib/elx/common/management/IndexDefinition.java rename to elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 37dcc45..7e12592 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/management/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -1,13 +1,12 @@ -package org.xbib.elx.common.management; +package org.xbib.elx.api; +import java.net.MalformedURLException; import java.net.URL; public class IndexDefinition { private String index; - private String type; - private String fullIndexName; private String dateTimePattern; @@ -28,6 +27,8 @@ public class IndexDefinition { private IndexRetention indexRetention; + private String maxWaitTime; + public IndexDefinition setIndex(String index) { this.index = index; return this; @@ -46,15 +47,11 @@ public class IndexDefinition { return fullIndexName; } - public IndexDefinition setType(String type) { - this.type = type; + public IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException { + this.settingsUrl = settingsUrlString != null ? new URL(settingsUrlString) : null; return this; } - public String getType() { - return type; - } - public IndexDefinition setSettingsUrl(URL settingsUrl) { this.settingsUrl = settingsUrl; return this; @@ -64,6 +61,11 @@ public class IndexDefinition { return settingsUrl; } + public IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException { + this.mappingsUrl = mappingsUrlString != null ? new URL(mappingsUrlString) : null; + return this; + } + public IndexDefinition setMappingsUrl(URL mappingsUrl) { this.mappingsUrl = mappingsUrl; return this; @@ -136,4 +138,12 @@ public class IndexDefinition { return indexRetention; } + public IndexDefinition setMaxWaitTime(String maxWaitTime) { + this.maxWaitTime = maxWaitTime; + return this; + } + + public String getMaxWaitTime() { + return maxWaitTime; + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/management/IndexRetention.java b/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java similarity index 73% rename from elx-common/src/main/java/org/xbib/elx/common/management/IndexRetention.java rename to elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java index 8024ef4..e1445fc 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/management/IndexRetention.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java @@ -1,4 +1,4 @@ -package org.xbib.elx.common.management; +package org.xbib.elx.api; public class IndexRetention { @@ -6,12 +6,12 @@ public class IndexRetention { private int minToKeep; - public IndexRetention setTimestampDiff(int timestampDiff) { + public IndexRetention setDelta(int timestampDiff) { this.timestampDiff = timestampDiff; return this; } - public int getTimestampDiff() { + public int getDelta() { return timestampDiff; } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java index 69f2608..e9a2230 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushAction; @@ -33,14 +34,14 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; import org.elasticsearch.action.admin.indices.recovery.RecoveryAction; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -60,13 +61,12 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; @@ -75,15 +75,11 @@ import org.xbib.elx.api.BulkControl; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.ExtendedClient; import org.xbib.elx.api.IndexAliasAdder; -import org.xbib.elx.common.management.IndexDefinition; -import org.xbib.elx.common.management.IndexRetention; +import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.api.IndexRetention; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.io.StringWriter; -import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.LocalDate; @@ -100,25 +96,33 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery; -import static org.elasticsearch.index.query.QueryBuilders.existsQuery; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; - public abstract class AbstractExtendedClient implements ExtendedClient { private static final Logger logger = LogManager.getLogger(AbstractExtendedClient.class.getName()); - private Map mappings; + /** + * The one and only index type name used in the extended client. + * Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_". + */ + private static final String TYPE_NAME = "doc"; + /** + * The Elasticsearch client. + */ private ElasticsearchClient client; + /** + * Our replacement for the buk processor. + */ private BulkProcessor bulkProcessor; private BulkMetric bulkMetric; @@ -129,22 +133,9 @@ public abstract class AbstractExtendedClient implements ExtendedClient { private boolean closed; - private int maxActionsPerRequest; - - private int maxConcurrentRequests; - - private String maxVolumePerRequest; - - private String flushIngestInterval; - protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; protected AbstractExtendedClient() { - maxActionsPerRequest = Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum(); - maxConcurrentRequests = Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum(); - maxVolumePerRequest = Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(); - flushIngestInterval = Parameters.DEFAULT_FLUSH_INTERVAL.getString(); - mappings = new HashMap<>(); } @Override @@ -161,6 +152,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public AbstractExtendedClient setBulkMetric(BulkMetric metric) { this.bulkMetric = metric; + // you must start bulk metric or it will bail out at stop() + bulkMetric.start(); return this; } @@ -183,7 +176,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public AbstractExtendedClient init(Settings settings) throws IOException { if (client == null) { - this.client = createClient(settings); + client = createClient(settings); } if (bulkMetric != null) { bulkMetric.start(); @@ -259,17 +252,28 @@ public abstract class AbstractExtendedClient implements ExtendedClient { logger.error("after bulk [" + executionId + "] error", failure); } }; - if (this.client != null) { - BulkProcessor.Builder builder = BulkProcessor.builder((Client)this.client, listener) + if (client != null) { + int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), + Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); + int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), + Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum()); + TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), + TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum())); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), + ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), + "maxVolumePerRequest")); + logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + + "flushIngestInterval = {} maxVolumePerRequest = {}", + maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest); + BulkProcessor.Builder builder = BulkProcessor.builder((Client) client, listener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) - .setFlushInterval(TimeValue.parseTimeValue(flushIngestInterval, null, "flushIngestInterval")); - if (maxVolumePerRequest != null) { - builder.setBulkSize(ByteSizeValue.parseBytesSizeValue(maxVolumePerRequest, "maxVolumePerRequest")); - } + .setFlushInterval(flushIngestInterval) + .setBulkSize(maxVolumePerRequest); this.bulkProcessor = builder.build(); } this.closed = false; + this.throwable = null; return this; } @@ -277,60 +281,97 @@ public abstract class AbstractExtendedClient implements ExtendedClient { public synchronized void shutdown() throws IOException { ensureActive(); if (bulkProcessor != null) { - logger.info("closing bulk processor..."); + logger.info("closing bulk processor"); bulkProcessor.close(); } if (bulkMetric != null) { - logger.info("stopping metric"); + logger.info("stopping metric before bulk stop (for precise measurement)"); bulkMetric.stop(); } if (bulkControl != null && bulkControl.indices() != null && !bulkControl.indices().isEmpty()) { logger.info("stopping bulk mode for indices {}...", bulkControl.indices()); for (String index : bulkControl.indices()) { - stopBulk(index); + stopBulk(index, bulkControl.getMaxWaitTime()); } } + logger.info("shutdown complete"); } @Override - public ExtendedClient maxActionsPerRequest(int maxActionsPerRequest) { - this.maxActionsPerRequest = maxActionsPerRequest; - return this; - } - - @Override - public ExtendedClient maxConcurrentRequests(int maxConcurrentRequests) { - this.maxConcurrentRequests = maxConcurrentRequests; - return this; + public String getClusterName() { + ensureActive(); + try { + ClusterStateRequestBuilder clusterStateRequestBuilder = + new ClusterStateRequestBuilder(client, ClusterStateAction.INSTANCE).all(); + ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet(); + return clusterStateResponse.getClusterName().value(); + } catch (ElasticsearchTimeoutException e) { + logger.warn(e.getMessage(), e); + return "TIMEOUT"; + } catch (NoNodeAvailableException e) { + logger.warn(e.getMessage(), e); + return "DISCONNECTED"; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + return "[" + e.getMessage() + "]"; + } } @Override - public ExtendedClient maxVolumePerRequest(String maxVolumePerRequest) { - this.maxVolumePerRequest = maxVolumePerRequest; + public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException { + ensureActive(); + waitForCluster("YELLOW", "30s"); + URL indexSettings = indexDefinition.getSettingsUrl(); + if (indexSettings == null) { + logger.warn("warning while creating index '{}', no settings/mappings", + indexDefinition.getFullIndexName()); + newIndex(indexDefinition.getFullIndexName()); + return this; + } + URL indexMappings = indexDefinition.getMappingsUrl(); + if (indexMappings == null) { + logger.warn("warning while creating index '{}', no mappings", + indexDefinition.getFullIndexName()); + newIndex(indexDefinition.getFullIndexName(), indexSettings.openStream(), null); + return this; + } + try (InputStream indexSettingsInput = indexSettings.openStream(); + InputStream indexMappingsInput = indexMappings.openStream()) { + newIndex(indexDefinition.getFullIndexName(), indexSettingsInput, indexMappingsInput); + } catch (IOException e) { + if (indexDefinition.ignoreErrors()) { + logger.warn(e.getMessage(), e); + logger.warn("warning while creating index '{}' with settings at {} and mappings at {}", + indexDefinition.getFullIndexName(), indexSettings, indexMappings); + } else { + logger.error("error while creating index '{}' with settings at {} and mappings at {}", + indexDefinition.getFullIndexName(), indexSettings, indexMappings); + throw new IOException(e); + } + } return this; } @Override - public ExtendedClient flushIngestInterval(String flushIngestInterval) { - this.flushIngestInterval = flushIngestInterval; - return this; + public ExtendedClient newIndex(String index) { + return newIndex(index, Settings.EMPTY, (Map) null); } @Override - public ExtendedClient newIndex(String index) { - ensureActive(); - return newIndex(index, null, null); + public ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException { + return newIndex(index, + Settings.settingsBuilder().loadFromStream(".json", settings).build(), + JsonXContent.jsonXContent.createParser(mapping).mapOrdered()); } @Override - public ExtendedClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException { - mapping(type, mappings); - return newIndex(index, Settings.settingsBuilder().loadFromStream(".json", settings).build(), - this.mappings); + public ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException { + return newIndex(index, settings, + JsonXContent.jsonXContent.createParser(mapping).mapOrdered()); } @Override - public ExtendedClient newIndex(String index, Settings settings, Map mappings) { + public ExtendedClient newIndex(String index, Settings settings, Map mapping) { ensureActive(); if (index == null) { logger.warn("no index name given to create index"); @@ -339,16 +380,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient { CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE).setIndex(index); if (settings != null) { - logger.info("found settings {}", settings.getAsMap()); createIndexRequestBuilder.setSettings(settings); } - if (mappings != null) { - for (Map.Entry entry : mappings.entrySet()) { - String type = entry.getKey(); - String mapping = entry.getValue(); - logger.info("found mapping for {}", type); - createIndexRequestBuilder.addMapping(type, mapping, XContentType.JSON); - } + if (mapping != null) { + createIndexRequestBuilder.addMapping(TYPE_NAME, mapping); } CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); logger.info("index {} created: {}", index, createIndexResponse); @@ -356,15 +391,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public ExtendedClient newMapping(String index, String type, Map mapping) { - PutMappingRequestBuilder putMappingRequestBuilder = - new PutMappingRequestBuilder(client, PutMappingAction.INSTANCE) - .setIndices(index) - .setType(type) - .setSource(mapping); - putMappingRequestBuilder.execute().actionGet(); - logger.info("mapping created for index {} and type {}", index, type); - return this; + public ExtendedClient deleteIndex(IndexDefinition indexDefinition) { + return deleteIndex(indexDefinition.getFullIndexName()); } @Override @@ -380,6 +408,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient { return this; } + @Override + public ExtendedClient startBulk(IndexDefinition indexDefinition) throws IOException { + startBulk(indexDefinition.getFullIndexName(), -1, 1); + return this; + } + @Override public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException { @@ -395,52 +429,41 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public ExtendedClient stopBulk(String index) throws IOException { + public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException { + return stopBulk(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime()); + } + + @Override + public ExtendedClient stopBulk(String index, String maxWaitTime) throws IOException { ensureActive(); if (bulkControl == null) { return this; } - if (bulkControl.isBulk(index)) { - long secs = bulkControl.getStopBulkRefreshIntervals().get(index); - if (secs > 0L) { - updateIndexSetting(index, "refresh_interval", secs + "s"); + flushIngest(); + if (waitForResponses(maxWaitTime)) { + if (bulkControl.isBulk(index)) { + long secs = bulkControl.getStopBulkRefreshIntervals().get(index); + if (secs > 0L) { + updateIndexSetting(index, "refresh_interval", secs + "s"); + } + bulkControl.finishBulk(index); } - bulkControl.finishBulk(index); } return this; } @Override - public ExtendedClient flushIngest() { - ensureActive(); - logger.debug("flushing bulk processor"); - bulkProcessor.flush(); - return this; + public ExtendedClient index(String index, String id, boolean create, BytesReference source) { + return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source)); } @Override - public ExtendedClient waitForResponses(String maxWaitTime) throws InterruptedException { - ensureActive(); - long millis = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueMinutes(1),"millis").getMillis(); - logger.debug("waiting for " + millis + " millis"); - while (!bulkProcessor.awaitClose(millis, TimeUnit.MILLISECONDS)) { - logger.warn("still waiting for responses"); - } - return this; + public ExtendedClient index(String index, String id, boolean create, String source) { + return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source.getBytes(StandardCharsets.UTF_8))); } @Override - public ExtendedClient index(String index, String type, String id, boolean create, BytesReference source) { - return indexRequest(new IndexRequest(index).type(type).id(id).create(create).source(source)); - } - - @Override - public ExtendedClient index(String index, String type, String id, boolean create, String source) { - return indexRequest(new IndexRequest(index).type(type).id(id).create(create).source(source.getBytes(StandardCharsets.UTF_8))); - } - - @Override - public ExtendedClient indexRequest(IndexRequest indexRequest) { + public ExtendedClient index(IndexRequest indexRequest) { ensureActive(); try { if (bulkMetric != null) { @@ -456,12 +479,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public ExtendedClient delete(String index, String type, String id) { - return deleteRequest(new DeleteRequest(index).type(type).id(id)); + public ExtendedClient delete(String index, String id) { + return delete(new DeleteRequest(index, TYPE_NAME, id)); } @Override - public ExtendedClient deleteRequest(DeleteRequest deleteRequest) { + public ExtendedClient delete(DeleteRequest deleteRequest) { ensureActive(); try { if (bulkMetric != null) { @@ -477,17 +500,17 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public ExtendedClient update(String index, String type, String id, BytesReference source) { - return updateRequest(new UpdateRequest().index(index).type(type).id(id).upsert(source)); + public ExtendedClient update(String index, String id, BytesReference source) { + return update(new UpdateRequest(index, TYPE_NAME, id).doc(source)); } @Override - public ExtendedClient update(String index, String type, String id, String source) { - return updateRequest(new UpdateRequest().index(index).type(type).id(id).upsert(source.getBytes(StandardCharsets.UTF_8))); + public ExtendedClient update(String index, String id, String source) { + return update(new UpdateRequest(index, TYPE_NAME, id).doc(source.getBytes(StandardCharsets.UTF_8))); } @Override - public ExtendedClient updateRequest(UpdateRequest updateRequest) { + public ExtendedClient update(UpdateRequest updateRequest) { ensureActive(); try { if (bulkMetric != null) { @@ -503,54 +526,68 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public void mapping(String type, String mapping) { - mappings.put(type, mapping); + public ExtendedClient flushIngest() { + ensureActive(); + logger.debug("flushing bulk processor"); + bulkProcessor.flush(); + return this; } @Override - public void mapping(String type, InputStream in) throws IOException { - if (type == null) { - return; + public boolean waitForResponses(String maxWaitTime) { + ensureActive(); + long millis = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueMinutes(1),"millis").getMillis(); + logger.debug("waiting for " + millis + " millis"); + try { + return bulkProcessor.awaitFlush(millis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted"); + return false; } - StringWriter sw = new StringWriter(); - Streams.copy(new InputStreamReader(in, StandardCharsets.UTF_8), sw); - mappings.put(type, sw.toString()); } @Override - public int waitForRecovery(String index) throws IOException { + public boolean waitForRecovery(String index, String maxWaitTime) { ensureActive(); - if (index == null) { - throw new IOException("unable to wait for recovery, no index no given"); - } + ensureIndexGiven(index); RecoveryResponse response = client.execute(RecoveryAction.INSTANCE, new RecoveryRequest(index)).actionGet(); int shards = response.getTotalShards(); - client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest(index) - .waitForActiveShards(shards)).actionGet(); - return shards; + TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), + getClass().getSimpleName() + ".timeout"); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest(index) + .waitForActiveShards(shards).timeout(timeout)).actionGet(); + if (healthResponse != null && healthResponse.isTimedOut()) { + logger.error("timeout waiting for recovery"); + return false; + } + return true; } @Override - public void waitForCluster(String statusString, String timeout) throws IOException { + public boolean waitForCluster(String statusString, String maxWaitTime) { ensureActive(); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest() - .waitForStatus(status).timeout(timeout)).actionGet(); + TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), + getClass().getSimpleName() + ".timeout"); + ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, + new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + " and not " + status.name() - + ", from here on, everything will fail!"); + logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name()); + return false; } + return true; } @Override - public String healthColor() { + public String getHealthColor(String maxWaitTime) { ensureActive(); try { - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().timeout(TimeValue.timeValueSeconds(30))).actionGet(); + TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), + getClass().getSimpleName() + ".timeout"); + ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, + new ClusterHealthRequest().timeout(timeout)).actionGet(); ClusterHealthStatus status = healthResponse.getStatus(); return status.name(); } catch (ElasticsearchTimeoutException e) { @@ -566,37 +603,55 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public int updateReplicaLevel(String index, int level) throws IOException { - waitForCluster("YELLOW", "30s"); - updateIndexSetting(index, "number_of_replicas", level); - return waitForRecovery(index); + public ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { + return updateReplicaLevel(indexDefinition.getFullIndexName(), level, indexDefinition.getMaxWaitTime()); } @Override - public void flushIndex(String index) { - ensureActive(); - if (index != null) { - client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet(); + public ExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) throws IOException { + waitForCluster("YELLOW", maxWaitTime); // let cluster settle down from critical operations + if (level > 0) { + updateIndexSetting(index, "number_of_replicas", level); + waitForRecovery(index, maxWaitTime); } + return this; } @Override - public void refreshIndex(String index) { - ensureActive(); + public int getReplicaLevel(IndexDefinition indexDefinition) { + return getReplicaLevel(indexDefinition.getFullIndexName()); + } + + @Override + public int getReplicaLevel(String index) { + GetSettingsRequest request = new GetSettingsRequest().indices(index); + GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet(); + int replica = -1; + for (ObjectObjectCursor cursor : response.getIndexToSettings()) { + Settings settings = cursor.value; + if (index.equals(cursor.key)) { + replica = settings.getAsInt("index.number_of_replicas", null); + } + } + return replica; + } + + @Override + public ExtendedClient flushIndex(String index) { if (index != null) { - client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet(); + ensureActive(); + client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet(); } + return this; } @Override - public void putMapping(String index) { - ensureActive(); - if (mappings != null && !mappings.isEmpty()) { - for (Map.Entry me : mappings.entrySet()) { - client.execute(PutMappingAction.INSTANCE, - new PutMappingRequest(index).type(me.getKey()).source(me.getValue())).actionGet(); - } + public ExtendedClient refreshIndex(String index) { + if (index != null) { + ensureActive(); + client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet(); } + return this; } @Override @@ -636,16 +691,43 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public void switchAliases(String index, String concreteIndex, List extraAliases) { - switchAliases(index, concreteIndex, extraAliases, null); + public Map getIndexFilters(String index) { + GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE); + return getFilters(getAliasesRequestBuilder.setIndices(index).execute().actionGet()); + } + + @Override + public ExtendedClient switchIndex(IndexDefinition indexDefinition, List extraAliases) { + return switchIndex(indexDefinition, extraAliases, null); + } + + + @Override + public ExtendedClient switchIndex(IndexDefinition indexDefinition, + List extraAliases, IndexAliasAdder indexAliasAdder) { + if (extraAliases == null) { + return this; + } + if (indexDefinition.isSwitchAliases()) { + switchIndex(indexDefinition.getIndex(), + indexDefinition.getFullIndexName(), extraAliases.stream() + .filter(a -> a != null && !a.isEmpty()) + .collect(Collectors.toList()), indexAliasAdder); + } + return this; } @Override - public void switchAliases(String index, String concreteIndex, - List extraAliases, IndexAliasAdder adder) { + public ExtendedClient switchIndex(String index, String fullIndexName, List extraAliases) { + return switchIndex(index, fullIndexName, extraAliases, null); + } + + @Override + public ExtendedClient switchIndex(String index, String fullIndexName, + List extraAliases, IndexAliasAdder adder) { ensureActive(); - if (index.equals(concreteIndex)) { - return; + if (index.equals(fullIndexName)) { + return this; // nothing to switch to } // two situations: 1. there is a new alias 2. there is already an old index with the alias String oldIndex = resolveAlias(index); @@ -655,7 +737,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { IndicesAliasesRequestBuilder requestBuilder = new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE); if (oldFilterMap == null || !oldFilterMap.containsKey(index)) { // never apply a filter for trunk index name - requestBuilder.addAlias(concreteIndex, index); + requestBuilder.addAlias(fullIndexName, index); newAliases.add(index); } // switch existing aliases @@ -665,9 +747,9 @@ public abstract class AbstractExtendedClient implements ExtendedClient { String filter = entry.getValue(); requestBuilder.removeAlias(oldIndex, alias); if (filter != null) { - requestBuilder.addAlias(concreteIndex, alias, filter); + requestBuilder.addAlias(fullIndexName, alias, filter); } else { - requestBuilder.addAlias(concreteIndex, alias); + requestBuilder.addAlias(fullIndexName, alias); } switchAliases.add(alias); } @@ -678,18 +760,18 @@ public abstract class AbstractExtendedClient implements ExtendedClient { if (oldFilterMap == null || !oldFilterMap.containsKey(extraAlias)) { // index alias adder only active on extra aliases, and if alias is new if (adder != null) { - adder.addIndexAlias(requestBuilder, concreteIndex, extraAlias); + adder.addIndexAlias(requestBuilder, fullIndexName, extraAlias); } else { - requestBuilder.addAlias(concreteIndex, extraAlias); + requestBuilder.addAlias(fullIndexName, extraAlias); } newAliases.add(extraAlias); } else { String filter = oldFilterMap.get(extraAlias); requestBuilder.removeAlias(oldIndex, extraAlias); if (filter != null) { - requestBuilder.addAlias(concreteIndex, extraAlias, filter); + requestBuilder.addAlias(fullIndexName, extraAlias, filter); } else { - requestBuilder.addAlias(concreteIndex, extraAlias); + requestBuilder.addAlias(fullIndexName, extraAlias); } switchAliases.add(extraAlias); } @@ -699,15 +781,22 @@ public abstract class AbstractExtendedClient implements ExtendedClient { logger.info("new aliases = {}, switch aliases = {}", newAliases, switchAliases); requestBuilder.execute().actionGet(); } + return this; } @Override - public void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep) { - if (timestampdiff == 0 && mintokeep == 0) { + public void pruneIndex(IndexDefinition indexDefinition) { + pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), + indexDefinition.getRetention().getDelta(), indexDefinition.getRetention().getMinToKeep()); + } + + @Override + public void pruneIndex(String index, String fullIndexName, int delta, int mintokeep) { + if (delta == 0 && mintokeep == 0) { return; } ensureActive(); - if (index.equals(concreteIndex)) { + if (index.equals(fullIndexName)) { return; } GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); @@ -717,7 +806,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { logger.info("{} indices", getIndexResponse.getIndices().length); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); - if (m.matches() && index.equals(m.group(1)) && !s.equals(concreteIndex)) { + if (m.matches() && index.equals(m.group(1)) && !s.equals(fullIndexName)) { indices.add(s); } } @@ -734,7 +823,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } List indicesToDelete = new ArrayList<>(); // our index - Matcher m1 = pattern.matcher(concreteIndex); + Matcher m1 = pattern.matcher(fullIndexName); if (m1.matches()) { Integer i1 = Integer.parseInt(m1.group(2)); for (String s : indices) { @@ -742,7 +831,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { if (m2.matches()) { Integer i2 = Integer.parseInt(m2.group(2)); int kept = indices.size() - indicesToDelete.size(); - if ((timestampdiff == 0 || (timestampdiff > 0 && i1 - i2 > timestampdiff)) && mintokeep <= kept) { + if ((delta == 0 || (delta > 0 && i1 - i2 > delta)) && mintokeep <= kept) { indicesToDelete.add(s); } } @@ -753,9 +842,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient { logger.info("not enough indices found to delete, retention policy complete"); return; } - String[] s = indicesToDelete.toArray(new String[indicesToDelete.size()]); - DeleteIndexRequestBuilder requestBuilder = new DeleteIndexRequestBuilder(client, DeleteIndexAction.INSTANCE, s); - DeleteIndexResponse response = requestBuilder.execute().actionGet(); + String[] s = new String[indicesToDelete.size()]; + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest() + .indices(indicesToDelete.toArray(s)); + DeleteIndexResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); if (!response.isAcknowledged()) { logger.warn("retention delete index operation was not acknowledged"); } @@ -782,6 +872,70 @@ public abstract class AbstractExtendedClient implements ExtendedClient { return null; } + @Override + public boolean forceMerge(IndexDefinition indexDefinition) { + if (indexDefinition.hasForceMerge()) { + return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime()); + } + return false; + } + + @Override + public boolean forceMerge(String index, String maxWaitTime) { + TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), + getClass().getSimpleName() + ".timeout"); + ForceMergeRequestBuilder forceMergeRequestBuilder = + new ForceMergeRequestBuilder(client, ForceMergeAction.INSTANCE); + forceMergeRequestBuilder.setIndices(index); + try { + forceMergeRequestBuilder.execute().get(timeout.getMillis(), TimeUnit.MILLISECONDS); + return true; + } catch (TimeoutException e) { + logger.error("timeout"); + } catch (ExecutionException e) { + logger.error(e.getMessage(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error(e.getMessage(), e); + } + return false; + } + + @Override + public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) + throws IOException { + boolean isEnabled = settings.getAsBoolean("enabled", !(client instanceof MockExtendedClient)); + String indexName = settings.get("name", index); + String fullIndexName; + String dateTimePattern = settings.get("dateTimePattern"); + if (dateTimePattern != null) { + fullIndexName = resolveAlias(indexName + + DateTimeFormatter.ofPattern(dateTimePattern) + .withZone(ZoneId.systemDefault()) // not GMT + .format(LocalDate.now())); + logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName); + } else { + fullIndexName = resolveMostRecentIndex(indexName); + logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName); + } + IndexRetention indexRetention = new IndexRetention() + .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) + .setDelta(settings.getAsInt("retention.delta", 0)); + + return new IndexDefinition() + .setIndex(indexName) + .setFullIndexName(fullIndexName) + .setSettingsUrl(settings.get("settings")) + .setMappingsUrl(settings.get("mapping")) + .setDateTimePattern(dateTimePattern) + .setEnabled(isEnabled) + .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) + .setSwitchAliases(settings.getAsBoolean("aliases", true)) + .setReplicaLevel(settings.getAsInt("replica", 0)) + .setMaxWaitTime(settings.get("timout", "30s")) + .setRetention(indexRetention); + } + @Override public boolean hasThrowable() { return throwable != null; @@ -822,9 +976,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } } - public Map getIndexFilters(String index) { - GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE); - return getFilters(getAliasesRequestBuilder.setIndices(index).execute().actionGet()); + private void ensureIndexGiven(String index) { + if (index == null) { + throw new IllegalArgumentException("no index given"); + } } private Map getFilters(GetAliasesResponse getAliasesResponse) { @@ -834,7 +989,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { for (AliasMetaData aliasMetaData : aliasMetaDataList) { if (aliasMetaData.filteringRequired()) { result.put(aliasMetaData.alias(), - new String(aliasMetaData.getFilter().uncompressed(), StandardCharsets.UTF_8) ); + new String(aliasMetaData.getFilter().uncompressed(), StandardCharsets.UTF_8)); } else { result.put(aliasMetaData.alias(), null); } @@ -843,61 +998,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient { return result; } - @Override - public String getClusterName() { - ensureActive(); - try { - ClusterStateRequestBuilder clusterStateRequestBuilder = - new ClusterStateRequestBuilder(client, ClusterStateAction.INSTANCE).all(); - ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet(); - String name = clusterStateResponse.getClusterName().value(); - int nodeCount = clusterStateResponse.getState().getNodes().size(); - return name + " (" + nodeCount + " nodes connected)"; - } catch (ElasticsearchTimeoutException e) { - logger.warn(e.getMessage(), e); - return "TIMEOUT"; - } catch (NoNodeAvailableException e) { - logger.warn(e.getMessage(), e); - return "DISCONNECTED"; - } catch (Exception e) { - logger.warn(e.getMessage(), e); - return "[" + e.getMessage() + "]"; - } - } - - public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) - throws MalformedURLException { - boolean isEnabled = settings.getAsBoolean("enabled", !(client instanceof MockExtendedClient)); - String indexName = settings.get("name", index); - String fullIndexName; - String dateTimePattern = settings.get("dateTimePattern"); - if (dateTimePattern != null) { - fullIndexName = resolveAlias(indexName + - DateTimeFormatter.ofPattern(dateTimePattern) - .withZone(ZoneId.systemDefault()) // not GMT - .format(LocalDate.now())); - logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName); - } else { - fullIndexName = resolveMostRecentIndex(indexName); - logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName); - } - IndexRetention indexRetention = new IndexRetention() - .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) - .setTimestampDiff(settings.getAsInt("retention.diff", 0)); - return new IndexDefinition() - .setIndex(indexName) - .setFullIndexName(fullIndexName) - .setType(settings.get("type")) - .setSettingsUrl(new URL(settings.get("settings"))) - .setMappingsUrl(new URL(settings.get("mapping"))) - .setDateTimePattern(dateTimePattern) - .setEnabled(isEnabled) - .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) - .setSwitchAliases(settings.getAsBoolean("aliases", true)) - .setReplicaLevel(settings.getAsInt("replica", 0)) - .setRetention(indexRetention); - } - public void checkMapping(String index) { ensureActive(); GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) @@ -914,14 +1014,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient { }); } - @SuppressWarnings("unchecked") private void checkMapping(String index, String type, MappingMetaData mappingMetaData) { try { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchResponse searchResponse = searchRequestBuilder.setSize(0) .setIndices(index) .setTypes(type) - .setQuery(matchAllQuery()) + .setQuery(QueryBuilders.matchAllQuery()) .execute() .actionGet(); long total = searchResponse.getHits().getTotalHits(); @@ -977,8 +1076,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient { checkMapping(index, type, path, key, child, fields); } } else if ("type".equals(key)) { - QueryBuilder filterBuilder = existsQuery(path); - QueryBuilder queryBuilder = constantScoreQuery(filterBuilder); + QueryBuilder filterBuilder = QueryBuilders.existsQuery(path); + QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(filterBuilder); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchResponse searchResponse = searchRequestBuilder.setSize(0) .setIndices(index) @@ -997,148 +1096,4 @@ public abstract class AbstractExtendedClient implements ExtendedClient { .forEachOrdered(e -> result.put(e.getKey(), e.getValue())); return result; } - - @SuppressWarnings("unchecked") - public void createIndex(IndexDefinition indexDefinition) - throws IOException { - ensureActive(); - waitForCluster("YELLOW", "30s"); - URL indexSettings = indexDefinition.getSettingsUrl(); - if (indexSettings == null) { - throw new IllegalArgumentException("no settings defined for index " + indexDefinition.getIndex()); - } - URL indexMappings = indexDefinition.getMappingsUrl(); - if (indexMappings == null) { - throw new IllegalArgumentException("no mappings defined for index " + indexDefinition.getIndex()); - } - try (InputStream indexSettingsInput = indexSettings.openStream(); - InputStream indexMappingsInput = indexMappings.openStream()) { - // multiple type? - if (indexDefinition.getType() == null) { - Map mapping = new HashMap<>(); - // get type names from input stream - Reader reader = new InputStreamReader(indexMappingsInput, StandardCharsets.UTF_8); - Map map = JsonXContent.jsonXContent.createParser(reader).mapOrdered(); - for (Map.Entry entry : map.entrySet()) { - mapping.put(entry.getKey(), JsonXContent.contentBuilder().map((Map) entry.getValue()).string()); - } - Settings settings = Settings.settingsBuilder() - .loadFromStream("", indexSettingsInput) - .build(); - newIndex(indexDefinition.getFullIndexName(), settings, mapping); - } else { - newIndex(indexDefinition.getFullIndexName(), - indexDefinition.getType(), indexSettingsInput, indexMappingsInput); - } - } catch (IOException e) { - if (indexDefinition.ignoreErrors()) { - logger.warn(e.getMessage(), e); - logger.warn("warning while creating index '{}' with settings at {} and mappings at {}", - indexDefinition.getFullIndexName(), indexSettings, indexMappings); - } else { - logger.error("error while creating index '{}' with settings at {} and mappings at {}", - indexDefinition.getFullIndexName(), indexSettings, indexMappings); - throw new IOException(e); - } - } - } - - public void startBulk(Map defs) throws IOException { - ensureActive(); - for (Map.Entry entry : defs.entrySet()) { - IndexDefinition def = entry.getValue(); - startBulk(def.getFullIndexName(), -1, 1); - } - } - - public void stopBulk(Map defs) throws IOException { - ensureActive(); - if (defs == null) { - return; - } - try { - logger.info("flush bulk"); - flushIngest(); - logger.info("waiting for all bulk responses from cluster"); - waitForResponses("120s"); - logger.info("all bulk responses received"); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error(e.getMessage(), e); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { - logger.info("updating cluster settings of {}", defs.keySet()); - for (Map.Entry entry : defs.entrySet()) { - IndexDefinition def = entry.getValue(); - stopBulk(def.getFullIndexName()); - } - } - } - - public void forceMerge(Map defs) { - for (Map.Entry entry : defs.entrySet()) { - if (entry.getValue().hasForceMerge()) { - logger.info("force merge of {}", entry.getKey()); - try { - ForceMergeRequestBuilder forceMergeRequestBuilder = - new ForceMergeRequestBuilder(client, ForceMergeAction.INSTANCE); - forceMergeRequestBuilder.setIndices(entry.getValue().getFullIndexName()); - forceMergeRequestBuilder.execute().get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error(e.getMessage(), e); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - } - } - - public void switchIndex(IndexDefinition indexDefinition, List extraAliases) { - if (extraAliases == null) { - return; - } - if (indexDefinition.isSwitchAliases()) { - // filter out null/empty values - List validAliases = extraAliases.stream() - .filter(a -> a != null && !a.isEmpty()) - .collect(Collectors.toList()); - try { - switchAliases(indexDefinition.getIndex(), - indexDefinition.getFullIndexName(), validAliases); - } catch (Exception e) { - logger.warn("switching index failed: " + e.getMessage(), e); - } - } - } - - public void switchIndex(IndexDefinition indexDefinition, - List extraAliases, IndexAliasAdder indexAliasAdder) { - if (extraAliases == null) { - return; - } - if (indexDefinition.isSwitchAliases()) { - // filter out null/empty values - List validAliases = extraAliases.stream() - .filter(a -> a != null && !a.isEmpty()) - .collect(Collectors.toList()); - try { - switchAliases(indexDefinition.getIndex(), - indexDefinition.getFullIndexName(), validAliases, indexAliasAdder); - } catch (Exception e) { - logger.warn("switching index failed: " + e.getMessage(), e); - } - } - } - - public void replica(IndexDefinition indexDefinition) { - if (indexDefinition.getReplicaLevel() > 0) { - try { - updateReplicaLevel(indexDefinition.getFullIndexName(), indexDefinition.getReplicaLevel()); - } catch (Exception e) { - logger.warn("setting replica failed: " + e.getMessage(), e); - } - } - } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/BulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/BulkProcessor.java index 10f6e62..1aeeed4 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/BulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/BulkProcessor.java @@ -38,24 +38,24 @@ public class BulkProcessor implements Closeable { private final ScheduledFuture scheduledFuture; - private final AtomicLong executionIdGen = new AtomicLong(); + private final AtomicLong executionIdGen; private final BulkRequestHandler bulkRequestHandler; private BulkRequest bulkRequest; - private volatile boolean closed = false; + private volatile boolean closed; private BulkProcessor(Client client, Listener listener, String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { + this.executionIdGen = new AtomicLong(); + this.closed = false; this.bulkActions = bulkActions; this.bulkSize = bulkSize.bytes(); - this.bulkRequest = new BulkRequest(); this.bulkRequestHandler = concurrentRequests == 0 ? new SyncBulkRequestHandler(client, listener) : new AsyncBulkRequestHandler(client, listener, concurrentRequests); - if (flushInterval != null) { this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), @@ -83,6 +83,7 @@ public class BulkProcessor implements Closeable { @Override public void close() { try { + // 0 = immediate close awaitClose(0, TimeUnit.NANOSECONDS); } catch (InterruptedException exc) { Thread.currentThread().interrupt(); @@ -90,8 +91,27 @@ public class BulkProcessor implements Closeable { } /** - * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are - * flushed. + * Wait for bulk request handler with flush. + * @param timeout the timeout value + * @param unit the timeout unit + * @return true is method was successful, false if timeout + * @throws InterruptedException if timeout + */ + public boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException { + if (closed) { + return true; + } + // flush + if (bulkRequest.numberOfActions() > 0) { + execute(); + } + // wait for all bulk responses + return this.bulkRequestHandler.close(timeout, unit); + } + + /** + * Closes the processor. Any remaining bulk actions are flushed and then closed. This emthod can only be called + * once as the last action of a bulk processor. * * If concurrent requests are not enabled, returns {@code true} immediately. * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then @@ -116,7 +136,7 @@ public class BulkProcessor implements Closeable { if (bulkRequest.numberOfActions() > 0) { execute(); } - return this.bulkRequestHandler.awaitClose(timeout, unit); + return this.bulkRequestHandler.close(timeout, unit); } /** @@ -257,7 +277,7 @@ public class BulkProcessor implements Closeable { private int bulkActions = 1000; - private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); + private ByteSizeValue bulkSize = new ByteSizeValue(10, ByteSizeUnit.MB); private TimeValue flushInterval = null; @@ -367,7 +387,7 @@ public class BulkProcessor implements Closeable { void execute(BulkRequest bulkRequest, long executionId); - boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; + boolean close(long timeout, TimeUnit unit) throws InterruptedException; } @@ -398,7 +418,7 @@ public class BulkProcessor implements Closeable { } @Override - public boolean awaitClose(long timeout, TimeUnit unit) { + public boolean close(long timeout, TimeUnit unit) { return true; } } @@ -461,7 +481,7 @@ public class BulkProcessor implements Closeable { } @Override - public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + public boolean close(long timeout, TimeUnit unit) throws InterruptedException { if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) { semaphore.release(concurrentRequests); return true; diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java index dc01807..7e5a1e4 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java @@ -6,8 +6,6 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import java.util.Map; - /** * Mock client, it does not perform actions on a cluster. Useful for testing or dry runs. */ @@ -29,52 +27,32 @@ public class MockExtendedClient extends AbstractExtendedClient { } @Override - public MockExtendedClient maxActionsPerRequest(int maxActions) { - return this; - } - - @Override - public MockExtendedClient maxConcurrentRequests(int maxConcurrentRequests) { - return this; - } - - @Override - public MockExtendedClient maxVolumePerRequest(String maxVolumePerRequest) { - return this; - } - - @Override - public MockExtendedClient flushIngestInterval(String interval) { + public MockExtendedClient index(String index, String id, boolean create, String source) { return this; } @Override - public MockExtendedClient index(String index, String type, String id, boolean create, String source) { + public MockExtendedClient delete(String index, String id) { return this; } @Override - public MockExtendedClient delete(String index, String type, String id) { + public MockExtendedClient update(String index, String id, String source) { return this; } @Override - public MockExtendedClient update(String index, String type, String id, String source) { + public MockExtendedClient index(IndexRequest indexRequest) { return this; } @Override - public MockExtendedClient indexRequest(IndexRequest indexRequest) { + public MockExtendedClient delete(DeleteRequest deleteRequest) { return this; } @Override - public MockExtendedClient deleteRequest(DeleteRequest deleteRequest) { - return this; - } - - @Override - public MockExtendedClient updateRequest(UpdateRequest updateRequest) { + public MockExtendedClient update(UpdateRequest updateRequest) { return this; } @@ -84,17 +62,17 @@ public class MockExtendedClient extends AbstractExtendedClient { } @Override - public MockExtendedClient waitForResponses(String timeValue) { + public MockExtendedClient startBulk(String index, long startRefreshInterval, long stopRefreshIterval) { return this; } @Override - public MockExtendedClient startBulk(String index, long startRefreshInterval, long stopRefreshIterval) { + public MockExtendedClient stopBulk(String index, String maxWaitTime) { return this; } @Override - public MockExtendedClient stopBulk(String index) { + public MockExtendedClient newIndex(String index) { return this; } @@ -104,39 +82,38 @@ public class MockExtendedClient extends AbstractExtendedClient { } @Override - public MockExtendedClient newIndex(String index) { + public MockExtendedClient refreshIndex(String index) { return this; } @Override - public MockExtendedClient newMapping(String index, String type, Map mapping) { + public MockExtendedClient flushIndex(String index) { return this; } @Override - public void putMapping(String index) { - } - - @Override - public void refreshIndex(String index) { + public boolean forceMerge(String index, String maxWaitTime) { + return true; } @Override - public void flushIndex(String index) { + public boolean waitForCluster(String healthColor, String timeValue) { + return true; } @Override - public void waitForCluster(String healthColor, String timeValue) { + public boolean waitForResponses(String maxWaitTime) { + return true; } @Override - public int waitForRecovery(String index) { - return -1; + public boolean waitForRecovery(String index, String maxWaitTime) { + return true; } @Override - public int updateReplicaLevel(String index, int level) { - return -1; + public MockExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) { + return this; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 017c780..28d10d7 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -8,7 +8,7 @@ public enum Parameters { DEFAULT_MAX_VOLUME_PER_REQUEST("10mb"), - DEFAULT_FLUSH_INTERVAL("30s"), + DEFAULT_FLUSH_INTERVAL(30), MAX_ACTIONS_PER_REQUEST ("max_actions_per_request"), diff --git a/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java b/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java index 624cec5..d606ecc 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java +++ b/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java @@ -11,19 +11,26 @@ import java.util.Set; */ public class SimpleBulkControl implements BulkControl { - private final Set indexNames = new HashSet<>(); + private final Set indexNames; - private final Map startBulkRefreshIntervals = new HashMap<>(); + private final Map startBulkRefreshIntervals; - private final Map stopBulkRefreshIntervals = new HashMap<>(); + private final Map stopBulkRefreshIntervals; + + private String maxWaitTime; + + public SimpleBulkControl() { + indexNames = new HashSet<>(); + startBulkRefreshIntervals = new HashMap<>(); + stopBulkRefreshIntervals = new HashMap<>(); + maxWaitTime = "30s"; + } @Override public void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval) { - synchronized (indexNames) { - indexNames.add(indexName); - startBulkRefreshIntervals.put(indexName, startRefreshInterval); - stopBulkRefreshIntervals.put(indexName, stopRefreshInterval); - } + indexNames.add(indexName); + startBulkRefreshIntervals.put(indexName, startRefreshInterval); + stopBulkRefreshIntervals.put(indexName, stopRefreshInterval); } @Override @@ -33,9 +40,7 @@ public class SimpleBulkControl implements BulkControl { @Override public void finishBulk(String indexName) { - synchronized (indexNames) { - indexNames.remove(indexName); - } + indexNames.remove(indexName); } @Override @@ -53,4 +58,9 @@ public class SimpleBulkControl implements BulkControl { return stopBulkRefreshIntervals; } + @Override + public String getMaxWaitTime() { + return maxWaitTime; + } + } diff --git a/elx-common/src/main/java/org/xbib/elx/common/management/package-info.java b/elx-common/src/main/java/org/xbib/elx/common/management/package-info.java deleted file mode 100644 index 0d98623..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/management/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package org.xbib.elx.common.management; \ No newline at end of file diff --git a/elx-common/src/test/java/org/xbib/elx/common/AliasTest.java b/elx-common/src/test/java/org/xbib/elx/common/AliasTest.java index e9106d0..419da0e 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/AliasTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/AliasTest.java @@ -12,11 +12,11 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.common.Strings; import org.junit.Test; -import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.Set; @@ -32,9 +32,10 @@ public class AliasTest extends NodeTestUtils { private static final Logger logger = LogManager.getLogger(AliasTest.class.getName()); @Test - public void testAlias() throws IOException { + public void testAlias() { + Client client = client("1"); CreateIndexRequest indexRequest = new CreateIndexRequest("test"); - client("1").admin().indices().create(indexRequest).actionGet(); + client.admin().indices().create(indexRequest).actionGet(); // put alias IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); String[] indices = new String[]{"test"}; @@ -42,11 +43,11 @@ public class AliasTest extends NodeTestUtils { IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); indicesAliasesRequest.addAliasAction(aliasAction); - client("1").admin().indices().aliases(indicesAliasesRequest).actionGet(); + client.admin().indices().aliases(indicesAliasesRequest).actionGet(); // get alias GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY); long t0 = System.nanoTime(); - GetAliasesResponse getAliasesResponse = client("1").admin().indices().getAliases(getAliasesRequest).actionGet(); + GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(getAliasesRequest).actionGet(); long t1 = (System.nanoTime() - t0) / 1000000; logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1); assertTrue(t1 >= 0); @@ -54,22 +55,23 @@ public class AliasTest extends NodeTestUtils { @Test public void testMostRecentIndex() { + Client client = client("1"); String alias = "test"; CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101"); - client("1").admin().indices().create(indexRequest).actionGet(); + client.admin().indices().create(indexRequest).actionGet(); indexRequest = new CreateIndexRequest("test20160102"); - client("1").admin().indices().create(indexRequest).actionGet(); + client.admin().indices().create(indexRequest).actionGet(); indexRequest = new CreateIndexRequest("test20160103"); - client("1").admin().indices().create(indexRequest).actionGet(); + client.admin().indices().create(indexRequest).actionGet(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); String[] indices = new String[]{"test20160101", "test20160102", "test20160103"}; String[] aliases = new String[]{alias}; IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); indicesAliasesRequest.addAliasAction(aliasAction); - client("1").admin().indices().aliases(indicesAliasesRequest).actionGet(); + client.admin().indices().aliases(indicesAliasesRequest).actionGet(); - GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client("1"), + GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE); GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); diff --git a/elx-node/src/test/java/org/elasticsearch/node/MockNode.java b/elx-node/src/test/java/org/elasticsearch/node/MockNode.java index aad8b8b..1de4c2f 100644 --- a/elx-node/src/test/java/org/elasticsearch/node/MockNode.java +++ b/elx-node/src/test/java/org/elasticsearch/node/MockNode.java @@ -10,10 +10,6 @@ import java.util.Collection; public class MockNode extends Node { - public MockNode() { - super(Settings.EMPTY); - } - public MockNode(Settings settings) { super(settings); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendeNodeDuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendeNodeDuplicateIDTest.java index 97cb185..ad75a95 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendeNodeDuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ExtendeNodeDuplicateIDTest.java @@ -32,7 +32,7 @@ public class ExtendeNodeDuplicateIDTest extends NodeTestUtils { try { client.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { - client.index("test", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientSingleNodeTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientSingleNodeTest.java deleted file mode 100644 index 0d7335d..0000000 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientSingleNodeTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.xbib.elx.node; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.junit.Test; -import org.xbib.elx.common.ClientBuilder; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -public class ExtendedNodeClientSingleNodeTest extends NodeTestUtils { - - private static final Logger logger = LogManager.getLogger(ExtendedNodeClientSingleNodeTest.class.getSimpleName()); - - @Test - public void testSingleDocNodeClient() throws Exception { - final ExtendedNodeClient client = ClientBuilder.builder(client("1")) - .provider(ExtendedNodeClientProvider.class) - .build(); - try { - client.newIndex("test"); - client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - client.flushIngest(); - client.waitForResponses("30s"); - } catch (InterruptedException e) { - // ignore - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } finally { - assertEquals(1, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); - client.shutdown(); - } - } -} diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientTest.java index 957972d..de28b4b 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientTest.java @@ -3,6 +3,7 @@ package org.xbib.elx.node; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,6 +13,7 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -44,7 +46,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } @Test - public void testSingleDocNodeClient() throws Exception { + public void testSingleDoc() throws Exception { final ExtendedNodeClient client = ClientBuilder.builder(client("1")) .provider(ExtendedNodeClientProvider.class) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) @@ -52,11 +54,9 @@ public class ExtendedNodeClientTest extends NodeTestUtils { .build(); try { client.newIndex("test"); - client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest client.flushIngest(); client.waitForResponses("30s"); - } catch (InterruptedException e) { - // ignore } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { @@ -70,7 +70,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } @Test - public void testNewIndexNodeClient() throws Exception { + public void testNewIndex() throws Exception { final ExtendedNodeClient client = ClientBuilder.builder(client("1")) .provider(ExtendedNodeClientProvider.class) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) @@ -84,14 +84,14 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } @Test - public void testMappingNodeClient() throws Exception { + public void testMapping() throws Exception { final ExtendedNodeClient client = ClientBuilder.builder(client("1")) .provider(ExtendedNodeClientProvider.class) .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) .build(); XContentBuilder builder = jsonBuilder() .startObject() - .startObject("test") + .startObject("doc") .startObject("properties") .startObject("location") .field("type", "geo_point") @@ -99,12 +99,12 @@ public class ExtendedNodeClientTest extends NodeTestUtils { .endObject() .endObject() .endObject(); - client.mapping("test", builder.string()); - client.newIndex("test"); + client.newIndex("test", Settings.EMPTY, builder.string()); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test"); GetMappingsResponse getMappingsResponse = client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); logger.info("mappings={}", getMappingsResponse.getMappings()); + assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc")); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); } @@ -113,7 +113,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } @Test - public void testRandomDocsNodeClient() throws Exception { + public void testRandomDocs() throws Exception { long numactions = ACTIONS; final ExtendedNodeClient client = ClientBuilder.builder(client("1")) .provider(ExtendedNodeClientProvider.class) @@ -123,7 +123,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { try { client.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { - client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); @@ -145,7 +145,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } @Test - public void testThreadedRandomDocsNodeClient() throws Exception { + public void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final Long actions = ACTIONS; @@ -165,7 +165,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { for (int i = 0; i < maxthreads; i++) { pool.execute(() -> { for (int i1 = 0; i1 < actions; i1++) { - client.index("test", "test", null, false,"{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test", null, false,"{ \"name\" : \"" + randomString(32) + "\"}"); } latch.countDown(); }); @@ -184,7 +184,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.stopBulk("test"); + client.stopBulk("test", "30s"); assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount()); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeIndexAliasTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeIndexAliasTest.java index 1503fee..222b261 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeIndexAliasTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeIndexAliasTest.java @@ -27,23 +27,23 @@ public class ExtendedNodeIndexAliasTest extends NodeTestUtils { try { client.newIndex("test1234"); for (int i = 0; i < 1; i++) { - client.index("test1234", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.refreshIndex("test1234"); List simpleAliases = Arrays.asList("a", "b", "c"); - client.switchAliases("test", "test1234", simpleAliases); + client.switchIndex("test", "test1234", simpleAliases); client.newIndex("test5678"); for (int i = 0; i < 1; i++) { - client.index("test5678", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.refreshIndex("test5678"); simpleAliases = Arrays.asList("d", "e", "f"); - client.switchAliases("test", "test5678", simpleAliases, (builder, index, alias) -> + client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) -> builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias))); Map aliases = client.getIndexFilters("test5678"); logger.info("aliases of index test5678 = {}", aliases); diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeReplicaTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeReplicaTest.java index 89de2df..ed53cf5 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeReplicaTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeReplicaTest.java @@ -17,6 +17,7 @@ import org.junit.Ignore; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; +import java.util.HashMap; import java.util.Map; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -51,14 +52,14 @@ public class ExtendedNodeReplicaTest extends NodeTestUtils { .build(); try { - client.newIndex("test1", settingsTest1, null) - .newIndex("test2", settingsTest2, null); + client.newIndex("test1", settingsTest1, new HashMap<>()) + .newIndex("test2", settingsTest2, new HashMap<>()); client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 1234; i++) { - client.index("test1", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } for (int i = 0; i < 1234; i++) { - client.index("test2", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeSmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeSmokeTest.java new file mode 100644 index 0000000..adac4c6 --- /dev/null +++ b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeSmokeTest.java @@ -0,0 +1,69 @@ +package org.xbib.elx.node; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.common.settings.Settings; +import org.junit.Test; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.SimpleBulkControl; +import org.xbib.elx.common.SimpleBulkMetric; +import org.xbib.elx.api.IndexDefinition; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class ExtendedNodeSmokeTest extends NodeTestUtils { + + private static final Logger logger = LogManager.getLogger(ExtendedNodeSmokeTest.class.getSimpleName()); + + @Test + public void smokeTest() throws Exception { + final ExtendedNodeClient client = ClientBuilder.builder(client("1")) + .provider(ExtendedNodeClientProvider.class) + .build(); + try { + client.setBulkControl(new SimpleBulkControl()); + client.setBulkMetric(new SimpleBulkMetric()); + client.newIndex("test"); + client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + client.flushIngest(); + client.waitForResponses("30s"); + + assertEquals(clusterName, client.getClusterName()); + + client.checkMapping("test"); + + client.update("test", "1", "{ \"name\" : \"Another name\"}"); + client.flushIngest(); + + client.waitForRecovery("test", "10s"); + + client.delete("test", "1"); + client.deleteIndex("test"); + + IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test2", Settings.settingsBuilder() + .build()); + assertEquals(0, indexDefinition.getReplicaLevel()); + client.newIndex(indexDefinition); + client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); + client.flushIngest(); + client.updateReplicaLevel(indexDefinition, 2); + + int replica = client.getReplicaLevel(indexDefinition); + assertEquals(2, replica); + + client.deleteIndex(indexDefinition); + assertEquals(0, client.getBulkMetric().getFailed().getCount()); + assertEquals(4, client.getBulkMetric().getSucceeded().getCount()); + } catch (NoNodeAvailableException e) { + logger.warn("skipping, no node available"); + } finally { + if (client.hasThrowable()) { + logger.error("error", client.getThrowable()); + } + assertFalse(client.hasThrowable()); + client.shutdown(); + } + } +} diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java index a1b29a4..01e1df4 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java @@ -8,6 +8,8 @@ import org.junit.Ignore; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; +import java.util.HashMap; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -38,15 +40,15 @@ public class ExtendedNodeUpdateReplicaLevelTest extends NodeTestUtils { .build(); try { - client.newIndex("replicatest", settings, null); + client.newIndex("replicatest", settings, new HashMap<>()); client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 12345; i++) { - client.index("replicatest", "replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); - shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel); - assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); + client.updateReplicaLevel("replicatest", replicaLevel, "30s"); + //assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { diff --git a/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java b/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java index 9a4750e..05bf386 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java +++ b/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java @@ -47,11 +47,7 @@ public class NodeTestUtils { private AtomicInteger counter = new AtomicInteger(); - private String cluster; - - private String host; - - private int port; + protected String clusterName; private static void deleteFiles() throws IOException { Path directory = Paths.get(getHome() + "/data"); @@ -115,7 +111,7 @@ public class NodeTestUtils { } protected void setClusterName() { - this.cluster = "test-helper-cluster-" + this.clusterName = "test-helper-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "-" + System.getProperty("user.name") + "-" + counter.incrementAndGet(); @@ -123,7 +119,7 @@ public class NodeTestUtils { protected Settings getNodeSettings() { return settingsBuilder() - .put("cluster.name", cluster) + .put("cluster.name", clusterName) .put("cluster.routing.schedule", "50ms") .put("cluster.routing.allocation.disk.threshold_enabled", false) .put("discovery.zen.multicast.enabled", true) @@ -171,8 +167,8 @@ public class NodeTestUtils { .publishAddress(); if (obj instanceof InetSocketTransportAddress) { InetSocketTransportAddress address = (InetSocketTransportAddress) obj; - host = address.address().getHostName(); - port = address.address().getPort(); + String host = address.address().getHostName(); + int port = address.address().getPort(); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java index f66ac58..e883091 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.jboss.netty.channel.DefaultChannelFuture; import org.xbib.elx.common.AbstractExtendedClient; import org.xbib.elx.common.util.NetworkUtils; @@ -36,15 +37,22 @@ public class ExtendedTransportClient extends AbstractExtendedClient { + " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.vm.version") + " Elasticsearch " + Version.CURRENT.toString(); - logger.info("creating transport client on {} with effective settings {}", - systemIdentifier, settings.getAsMap()); - TransportClient.Builder builder = TransportClient.builder() - .settings(Settings.builder() - .put("cluster.name", settings.get("cluster.name")) - .put("processors", settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) - .put("client.transport.ignore_cluster_name", true) - .build()); - return builder.build(); + Settings effectiveSettings = Settings.builder() + // for thread pool size + .put("processors", + settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) + .put("client.transport.sniff", false) // do not sniff + .put("client.transport.nodes_sampler_interval", "1m") // do not ping + .put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect + .put("client.transport.ignore_cluster_name", true) // connect to any cluster + // custom settings may override defaults + .put(settings) + .build(); + logger.info("creating transport client on {} with custom settings {} and effective settings {}", + systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap()); + // we need to disable dead lock check because we may have mixed node/transport clients + DefaultChannelFuture.setUseDeadLockChecker(false); + return TransportClient.builder().settings(effectiveSettings).build(); } return null; } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientSingleNodeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientSingleNodeTest.java index cb9dba9..3422762 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientSingleNodeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientSingleNodeTest.java @@ -21,11 +21,9 @@ public class ExtendedTransportClientSingleNodeTest extends NodeTestUtils { .build(); try { client.newIndex("test"); - client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest client.flushIngest(); client.waitForResponses("30s"); - } catch (InterruptedException e) { - // ignore } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientTest.java index cfca3da..5085df6 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientTest.java @@ -2,25 +2,31 @@ package org.xbib.elx.transport; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.junit.Before; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; - +import static org.junit.Assert.assertTrue; public class ExtendedTransportClientTest extends NodeTestUtils { @@ -41,7 +47,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { } @Test - public void testBulkClient() throws Exception { + public void testClientIndexOp() throws Exception { final ExtendedTransportClient client = ClientBuilder.builder() .provider(ExtendedTransportClientProvider.class) .put(getSettings()) @@ -68,7 +74,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { } @Test - public void testSingleDocBulkClient() throws Exception { + public void testSingleDoc() throws Exception { final ExtendedTransportClient client = ClientBuilder.builder() .provider(ExtendedTransportClientProvider.class) .put(getSettings()) @@ -77,11 +83,9 @@ public class ExtendedTransportClientTest extends NodeTestUtils { .build(); try { client.newIndex("test"); - client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); + client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); client.flushIngest(); client.waitForResponses("30s"); - } catch (InterruptedException e) { - // ignore } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { @@ -95,7 +99,37 @@ public class ExtendedTransportClientTest extends NodeTestUtils { } @Test - public void testRandomDocsBulkClient() throws Exception { + public void testMapping() throws Exception { + final ExtendedTransportClient client = ClientBuilder.builder() + .provider(ExtendedTransportClientProvider.class) + .put(getSettings()) + .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) + .build(); + XContentBuilder builder = jsonBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + client.newIndex("test", Settings.EMPTY, builder.string()); + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test"); + GetMappingsResponse getMappingsResponse = + client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); + logger.info("mappings={}", getMappingsResponse.getMappings()); + assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc")); + if (client.hasThrowable()) { + logger.error("error", client.getThrowable()); + } + assertFalse(client.hasThrowable()); + client.shutdown(); + } + + @Test + public void testRandomDocs() throws Exception { long numactions = ACTIONS; final ExtendedTransportClient client = ClientBuilder.builder() .provider(ExtendedTransportClientProvider.class) @@ -106,12 +140,10 @@ public class ExtendedTransportClientTest extends NodeTestUtils { try { client.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { - client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); - } catch (InterruptedException e) { - // ignore } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { @@ -125,7 +157,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { } @Test - public void testThreadedRandomDocsBulkClient() throws Exception { + public void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); long maxactions = MAX_ACTIONS_PER_REQUEST; final long maxloop = ACTIONS; @@ -142,7 +174,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) .build(); try { - client.newIndex("test", settingsForIndex, null) + client.newIndex("test", settingsForIndex, new HashMap<>()) .startBulk("test", -1, 1000); ThreadPoolExecutor pool = EsExecutors.newFixed("bulkclient-test", maxthreads, 30, EsExecutors.daemonThreadFactory("bulkclient-test")); @@ -150,7 +182,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { for (int i = 0; i < maxthreads; i++) { pool.execute(() -> { for (int i1 = 0; i1 < maxloop; i1++) { - client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test",null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } latch.countDown(); }); @@ -164,15 +196,16 @@ public class ExtendedTransportClientTest extends NodeTestUtils { pool.shutdown(); logger.info("poot shut down"); } + client.stopBulk("test", "30s"); + assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount()); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.stopBulk("test"); - assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount()); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); } assertFalse(client.hasThrowable()); + // extra search lookup client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE) // to avoid NPE at org.elasticsearch.action.search.SearchRequest.writeTo(SearchRequest.java:580) diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportDuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportDuplicateIDTest.java index 1fa73c9..ea1cd1d 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportDuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportDuplicateIDTest.java @@ -31,7 +31,7 @@ public class ExtendedTransportDuplicateIDTest extends NodeTestUtils { try { client.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { - client.index("test", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportIndexAliasTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportIndexAliasTest.java index a2dfb4c..8d27231 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportIndexAliasTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportIndexAliasTest.java @@ -4,7 +4,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.index.query.QueryBuilders; -import org.junit.Ignore; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; @@ -13,8 +12,8 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; -@Ignore public class ExtendedTransportIndexAliasTest extends NodeTestUtils { private static final Logger logger = LogManager.getLogger(ExtendedTransportIndexAliasTest.class.getSimpleName()); @@ -23,43 +22,53 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils { public void testIndexAlias() throws Exception { final ExtendedTransportClient client = ClientBuilder.builder() .provider(ExtendedTransportClientProvider.class) - .build(); + .put(getSettings()).build(); try { client.newIndex("test1234"); for (int i = 0; i < 1; i++) { - client.index("test1234", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.refreshIndex("test1234"); List simpleAliases = Arrays.asList("a", "b", "c"); - client.switchAliases("test", "test1234", simpleAliases); + client.switchIndex("test", "test1234", simpleAliases); client.newIndex("test5678"); for (int i = 0; i < 1; i++) { - client.index("test5678", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.refreshIndex("test5678"); simpleAliases = Arrays.asList("d", "e", "f"); - client.switchAliases("test", "test5678", simpleAliases, (builder, index, alias) -> + client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) -> builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias))); - Map aliases = client.getIndexFilters("test5678"); - logger.info("aliases of index test5678 = {}", aliases); + Map indexFilters = client.getIndexFilters("test5678"); + logger.info("index filters of index test5678 = {}", indexFilters); + assertTrue(indexFilters.containsKey("a")); + assertTrue(indexFilters.containsKey("b")); + assertTrue(indexFilters.containsKey("c")); + assertTrue(indexFilters.containsKey("d")); + assertTrue(indexFilters.containsKey("e")); - aliases = client.getAliasFilters("test"); + Map aliases = client.getAliasFilters("test"); logger.info("aliases of alias test = {}", aliases); + assertTrue(aliases.containsKey("a")); + assertTrue(aliases.containsKey("b")); + assertTrue(aliases.containsKey("c")); + assertTrue(aliases.containsKey("d")); + assertTrue(aliases.containsKey("e")); + client.waitForResponses("30s"); + assertFalse(client.hasThrowable()); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.waitForResponses("30s"); - client.shutdown(); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); } - assertFalse(client.hasThrowable()); + client.shutdown(); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportReplicaTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportReplicaTest.java index 6b6d6d4..be95352 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportReplicaTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportReplicaTest.java @@ -16,15 +16,13 @@ import org.elasticsearch.index.indexing.IndexingStats; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; +import java.util.HashMap; import java.util.Map; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -/** - * - */ public class ExtendedTransportReplicaTest extends NodeTestUtils { private static final Logger logger = LogManager.getLogger(ExtendedTransportReplicaTest.class.getSimpleName()); @@ -53,29 +51,30 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils { .build(); try { - client.newIndex("test1", settingsTest1, null) - .newIndex("test2", settingsTest2, null); + client.newIndex("test1", settingsTest1, new HashMap<>()) + .newIndex("test2", settingsTest2, new HashMap<>()); client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 1234; i++) { - client.index("test1", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } for (int i = 0; i < 1234; i++) { - client.index("test2", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); + client.refreshIndex("test1"); + client.refreshIndex("test2"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - logger.info("refreshing"); - client.refreshIndex("test1"); - client.refreshIndex("test2"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE) .setIndices("test1", "test2") .setQuery(matchAllQuery()); long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits(); logger.info("query total hits={}", hits); assertEquals(2468, hits); + + // TODO move to api IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.getClient(), IndicesStatsAction.INSTANCE).all(); IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet(); @@ -93,16 +92,15 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils { } } try { - client.deleteIndex("test1") - .deleteIndex("test2"); + client.deleteIndex("test1").deleteIndex("test2"); } catch (Exception e) { logger.error("delete index failed, ignored. Reason:", e); } - client.shutdown(); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); } assertFalse(client.hasThrowable()); + client.shutdown(); } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java index 17d71d6..1992021 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java @@ -7,6 +7,8 @@ import org.elasticsearch.common.settings.Settings; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; +import java.util.HashMap; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -37,15 +39,15 @@ public class ExtendedTransportUpdateReplicaLevelTest extends NodeTestUtils { .build(); try { - client.newIndex("replicatest", settings, null); + client.newIndex("replicatest", settings, new HashMap<>()); client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 12345; i++) { - client.index("replicatest", "replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + client.index("replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); client.waitForResponses("30s"); - shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel); - assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); + client.updateReplicaLevel("replicatest", replicaLevel, "30s"); + //assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { diff --git a/gradle.properties b/gradle.properties index b6d2ad5..c961b52 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.3 +version = 2.2.1.4 xbib-metrics.version = 1.1.0 xbib-guice.version = 4.0.4 diff --git a/gradle/sonarqube.gradle b/gradle/sonarqube.gradle deleted file mode 100644 index 5de408d..0000000 --- a/gradle/sonarqube.gradle +++ /dev/null @@ -1,41 +0,0 @@ -tasks.withType(FindBugs) { - ignoreFailures = true - reports { - xml.enabled = true - html.enabled = false - } -} -tasks.withType(Pmd) { - ignoreFailures = true - reports { - xml.enabled = true - html.enabled = true - } -} -tasks.withType(Checkstyle) { - ignoreFailures = true - reports { - xml.enabled = true - html.enabled = true - } -} - -jacocoTestReport { - reports { - xml.enabled true - csv.enabled false - xml.destination "${buildDir}/reports/jacoco-xml" - html.destination "${buildDir}/reports/jacoco-html" - } -} - -sonarqube { - properties { - property "sonar.projectName", "${project.group} ${project.name}" - property "sonar.sourceEncoding", "UTF-8" - property "sonar.tests", "src/integration-test/java" - property "sonar.scm.provider", "git" - property "sonar.java.coveragePlugin", "jacoco" - property "sonar.junit.reportsPath", "build/test-results/test/" - } -}