From 5184b75b36d298ac6641e673282aff7a492c19a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Wed, 20 Feb 2019 18:38:28 +0100 Subject: [PATCH] refactored bulk controller, improved api, renamed und cleaned methods, result classes, removed redundant code and methods --- .../java/org/xbib/elx/api/BulkControl.java | 21 - .../java/org/xbib/elx/api/BulkController.java | 36 ++ .../java/org/xbib/elx/api/BulkMetric.java | 11 +- .../java/org/xbib/elx/api/BulkProcessor.java | 64 ++ .../java/org/xbib/elx/api/ExtendedClient.java | 143 ++--- .../org/xbib/elx/api/IndexDefinition.java | 145 +---- .../org/xbib/elx/api/IndexPruneResult.java | 16 + .../java/org/xbib/elx/api/IndexRetention.java | 24 +- .../org/xbib/elx/api/IndexShiftResult.java | 10 + .../elx/common/AbstractExtendedClient.java | 576 ++++++++---------- .../org/xbib/elx/common/ClientBuilder.java | 24 +- .../elx/common/DefaultBulkController.java | 309 ++++++++++ ...BulkMetric.java => DefaultBulkMetric.java} | 28 +- ...ocessor.java => DefaultBulkProcessor.java} | 147 ++--- .../elx/common/DefaultIndexDefinition.java | 214 +++++++ .../elx/common/DefaultIndexRetention.java | 32 + .../xbib/elx/common/MockExtendedClient.java | 28 +- .../xbib/elx/common/SimpleBulkControl.java | 66 -- .../org/xbib/elx/common/io/package-info.java | 5 +- .../xbib/elx/common/util/package-info.java | 5 +- .../org/xbib/elx/node/ExtendedNodeClient.java | 5 +- .../java/org/xbib/elx/node/package-info.java | 4 + ...dedNodeClientTest.java => ClientTest.java} | 58 +- ...erBlockTest.java => ClusterBlockTest.java} | 16 +- ...licateIDTest.java => DuplicateIDTest.java} | 19 +- .../ExtendedNodeUpdateReplicaLevelTest.java | 63 -- ...ndexAliasTest.java => IndexShiftTest.java} | 44 +- .../java/org/xbib/elx/node/NodeTestUtils.java | 15 - ...dNodeReplicaTest.java => ReplicaTest.java} | 61 +- ...endedNodeSmokeTest.java => SmokeTest.java} | 30 +- .../transport/ExtendedTransportClient.java | 8 +- .../xbib/elx/transport/TransportClient.java | 4 +- ...ansportClientTest.java => ClientTest.java} | 60 +- ...licateIDTest.java => DuplicateIDTest.java} | 22 +- ...tendedTransportUpdateReplicaLevelTest.java | 61 -- ...ndexAliasTest.java => IndexShiftTest.java} | 27 +- ...sportReplicaTest.java => ReplicaTest.java} | 63 +- ...ientSingleNodeTest.java => SmokeTest.java} | 20 +- gradle.properties | 2 +- 39 files changed, 1398 insertions(+), 1088 deletions(-) delete mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkControl.java create mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkController.java create mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java create mode 100644 elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java create mode 100644 elx-api/src/main/java/org/xbib/elx/api/IndexShiftResult.java create mode 100644 elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java rename elx-common/src/main/java/org/xbib/elx/common/{SimpleBulkMetric.java => DefaultBulkMetric.java} (78%) rename elx-common/src/main/java/org/xbib/elx/common/{BulkProcessor.java => DefaultBulkProcessor.java} (80%) create mode 100644 elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java create mode 100644 elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java delete mode 100644 elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java create mode 100644 elx-node/src/main/java/org/xbib/elx/node/package-info.java rename elx-node/src/test/java/org/xbib/elx/node/{ExtendedNodeClientTest.java => ClientTest.java} (83%) rename elx-node/src/test/java/org/xbib/elx/node/{ExtendedNodeClusterBlockTest.java => ClusterBlockTest.java} (73%) rename elx-node/src/test/java/org/xbib/elx/node/{ExtendeNodeDuplicateIDTest.java => DuplicateIDTest.java} (78%) delete mode 100644 elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java rename elx-node/src/test/java/org/xbib/elx/node/{ExtendedNodeIndexAliasTest.java => IndexShiftTest.java} (50%) rename elx-node/src/test/java/org/xbib/elx/node/{ExtendedNodeReplicaTest.java => ReplicaTest.java} (66%) rename elx-node/src/test/java/org/xbib/elx/node/{ExtendedNodeSmokeTest.java => SmokeTest.java} (70%) rename elx-transport/src/test/java/org/xbib/elx/transport/{ExtendedTransportClientTest.java => ClientTest.java} (83%) rename elx-transport/src/test/java/org/xbib/elx/transport/{ExtendedTransportDuplicateIDTest.java => DuplicateIDTest.java} (74%) delete mode 100644 elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java rename elx-transport/src/test/java/org/xbib/elx/transport/{ExtendedTransportIndexAliasTest.java => IndexShiftTest.java} (72%) rename elx-transport/src/test/java/org/xbib/elx/transport/{ExtendedTransportReplicaTest.java => ReplicaTest.java} (65%) rename elx-transport/src/test/java/org/xbib/elx/transport/{ExtendedTransportClientSingleNodeTest.java => SmokeTest.java} (62%) diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkControl.java b/elx-api/src/main/java/org/xbib/elx/api/BulkControl.java deleted file mode 100644 index c43c137..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkControl.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.xbib.elx.api; - -import java.util.Map; -import java.util.Set; - -public interface BulkControl { - - void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval); - - boolean isBulk(String indexName); - - void finishBulk(String indexName); - - Set indices(); - - Map getStartBulkRefreshIntervals(); - - Map getStopBulkRefreshIntervals(); - - String getMaxWaitTime(); -} diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java new file mode 100644 index 0000000..69906ca --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java @@ -0,0 +1,36 @@ +package org.xbib.elx.api; + +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.settings.Settings; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public interface BulkController extends Closeable, Flushable { + + void init(Settings settings); + + Throwable getLastBulkError(); + + void startBulkMode(IndexDefinition indexDefinition) throws IOException; + + void startBulkMode(String indexName, long startRefreshIntervalInSeconds, + long stopRefreshIntervalInSeconds) throws IOException; + + void index(IndexRequest indexRequest); + + void delete(DeleteRequest deleteRequest); + + void update(UpdateRequest updateRequest); + + boolean waitForResponses(long timeout, TimeUnit timeUnit); + + void stopBulkMode(IndexDefinition indexDefinition) throws IOException; + + void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException; + +} diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index 3002b8c..3a406fb 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -1,9 +1,14 @@ package org.xbib.elx.api; +import org.elasticsearch.common.settings.Settings; import org.xbib.metrics.Count; import org.xbib.metrics.Metered; -public interface BulkMetric { +import java.io.Closeable; + +public interface BulkMetric extends Closeable { + + void init(Settings settings); Metered getTotalIngest(); @@ -19,9 +24,9 @@ public interface BulkMetric { Count getFailed(); + long elapsed(); + void start(); void stop(); - - long elapsed(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java new file mode 100644 index 0000000..5af92e1 --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -0,0 +1,64 @@ +package org.xbib.elx.api; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; + +import java.io.Closeable; +import java.io.Flushable; +import java.util.concurrent.TimeUnit; + +public interface BulkProcessor extends Closeable, Flushable { + + BulkProcessor add(ActionRequest request); + + BulkProcessor add(ActionRequest request, Object payload); + + boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; + + boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; + + interface BulkRequestHandler { + + void execute(BulkRequest bulkRequest, long executionId); + + boolean close(long timeout, TimeUnit unit) throws InterruptedException; + + } + + /** + * A listener for the execution. + */ + public interface Listener { + + /** + * Callback before the bulk is executed. + * + * @param executionId execution ID + * @param request request + */ + void beforeBulk(long executionId, BulkRequest request); + + /** + * Callback after a successful execution of bulk request. + * + * @param executionId execution ID + * @param request request + * @param response response + */ + void afterBulk(long executionId, BulkRequest request, BulkResponse response); + + /** + * Callback after a failed execution of bulk request. + * + * Note that in case an instance of InterruptedException is passed, which means that request + * processing has been + * cancelled externally, the thread's interruption status has been restored prior to calling this method. + * + * @param executionId execution ID + * @param request request + * @param failure failure + */ + void afterBulk(long executionId, BulkRequest request, Throwable failure); + } +} diff --git a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java index 9c2f0e1..fee17bc 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/ExtendedClient.java @@ -6,15 +6,19 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; + +import java.io.Closeable; +import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Interface for extended managing and indexing methods of an Elasticsearch client. */ -public interface ExtendedClient { +public interface ExtendedClient extends Flushable, Closeable { /** * Set an Elasticsearch client to extend from it. May be null for TransportClient. @@ -30,22 +34,6 @@ public interface ExtendedClient { */ ElasticsearchClient getClient(); - /** - * Initiative the extended client, cerates instances and connect to cluster, if required. - * - * @param settings settings - * @return this client - * @throws IOException if init fails - */ - ExtendedClient init(Settings settings) throws IOException; - - /** - * Set bulk metric. - * @param bulkMetric the bulk metric - * @return this client - */ - ExtendedClient setBulkMetric(BulkMetric bulkMetric); - /** * Get bulk metric. * @return the bulk metric @@ -53,17 +41,20 @@ public interface ExtendedClient { BulkMetric getBulkMetric(); /** - * Set bulk control. - * @param bulkControl the bulk control - * @return this + * Get buulk control. + * @return the bulk control */ - ExtendedClient setBulkControl(BulkControl bulkControl); + BulkController getBulkController(); /** - * Get buulk control. - * @return the bulk control + * Initiative the extended client, the bulk metric and bulk controller, + * creates instances and connect to cluster, if required. + * + * @param settings settings + * @return this client + * @throws IOException if init fails */ - BulkControl getBulkControl(); + ExtendedClient init(Settings settings) throws IOException; /** * Build index definition from settings. @@ -256,18 +247,12 @@ public interface ExtendedClient { * Stops bulk mode. * * @param index index - * @param maxWaitTime maximum wait time + * @param timeout maximum wait time + * @param timeUnit time unit for timeout * @return this * @throws IOException if bulk could not be stopped */ - ExtendedClient stopBulk(String index, String maxWaitTime) throws IOException; - - /** - * Flush bulk indexing, move all pending documents to the cluster. - * - * @return this - */ - ExtendedClient flushIngest(); + ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException; /** * Update replica level. @@ -284,10 +269,11 @@ public interface ExtendedClient { * @param index index * @param level the replica level * @param maxWaitTime maximum wait time + * @param timeUnit time unit * @return this * @throws IOException if replica setting could not be updated */ - ExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) throws IOException; + ExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException; /** * Get replica level. @@ -330,43 +316,57 @@ public interface ExtendedClient { * Force segment merge of an index. * @param index the index * @param maxWaitTime maximum wait time + * @param timeUnit time unit * @return this */ - boolean forceMerge(String index, String maxWaitTime); + boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit); /** * Wait for all outstanding bulk responses. * - * @param maxWaitTime maximum wait time + * @param timeout maximum wait time + * @param timeUnit unit of timeout value * @return true if wait succeeded, false if wait timed out */ - boolean waitForResponses(String maxWaitTime); + boolean waitForResponses(long timeout, TimeUnit timeUnit); /** * Wait for cluster being healthy. * * @param healthColor cluster health color to wait for * @param maxWaitTime time value + * @param timeUnit time unit * @return true if wait succeeded, false if wait timed out */ - boolean waitForCluster(String healthColor, String maxWaitTime); + boolean waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); /** * Get current health color. * * @param maxWaitTime maximum wait time + * @param timeUnit time unit * @return the cluster health color */ - String getHealthColor(String maxWaitTime); + String getHealthColor(long maxWaitTime, TimeUnit timeUnit); /** * Wait for index recovery (after replica change). * * @param index index * @param maxWaitTime maximum wait time + * @param timeUnit time unit * @return true if wait succeeded, false if wait timed out */ - boolean waitForRecovery(String index, String maxWaitTime); + boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit); + + /** + * Update index setting. + * @param index the index + * @param key the key of the value to be updated + * @param value the new value + * @throws IOException if update index setting failed + */ + void updateIndexSetting(String index, String key, Object value) throws IOException; /** * Resolve alias. @@ -385,14 +385,6 @@ public interface ExtendedClient { */ String resolveMostRecentIndex(String alias); - /** - * Get all alias filters. - * - * @param alias the alias - * @return map of alias filters - */ - Map getAliasFilters(String alias); - /** * Get all index filters. * @param index the index @@ -401,48 +393,49 @@ public interface ExtendedClient { Map getIndexFilters(String index); /** - * Switch from one index to another. + * Shift from one index to another. * @param indexDefinition the index definition - * @param extraAliases new aliases + * @param additionalAliases new aliases * @return this */ - ExtendedClient switchIndex(IndexDefinition indexDefinition, List extraAliases); + IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases); /** - * Switch from one index to another. + * Shift from one index to another. * @param indexDefinition the index definition - * @param extraAliases new aliases + * @param additionalAliases new aliases * @param indexAliasAdder method to add aliases * @return this */ - ExtendedClient switchIndex(IndexDefinition indexDefinition, List extraAliases, IndexAliasAdder indexAliasAdder); + IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases, + IndexAliasAdder indexAliasAdder); /** - * Switch from one index to another. - * + * Shift from one index to another. * @param index the index name * @param fullIndexName the index name with timestamp - * @param extraAliases a list of names that should be set as index aliases + * @param additionalAliases a list of names that should be set as index aliases * @return this */ - ExtendedClient switchIndex(String index, String fullIndexName, List extraAliases); + IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases); /** - * Switch from one index to another. - * + * Shift from one index to another. * @param index the index name * @param fullIndexName the index name with timestamp - * @param extraAliases a list of names that should be set as index aliases + * @param additionalAliases a list of names that should be set as index aliases * @param adder an adder method to create alias term queries * @return this */ - ExtendedClient switchIndex(String index, String fullIndexName, List extraAliases, IndexAliasAdder adder); + IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases, + IndexAliasAdder adder); /** * Prune index. * @param indexDefinition the index definition + * @return the index prune result */ - void pruneIndex(IndexDefinition indexDefinition); + IndexPruneResult pruneIndex(IndexDefinition indexDefinition); /** * Apply retention policy to prune indices. All indices before delta should be deleted, @@ -452,8 +445,10 @@ public interface ExtendedClient { * @param fullIndexName index name with timestamp * @param delta timestamp delta (for index timestamps) * @param mintokeep minimum number of indices to keep + * @param perform true if pruning should be executed, false if not + * @return the index prune result */ - void pruneIndex(String index, String fullIndexName, int delta, int mintokeep); + IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform); /** * Find the timestamp of the most recently indexed document in the index. @@ -470,24 +465,4 @@ public interface ExtendedClient { * @return the cluster name */ String getClusterName(); - - /** - * Returns true is a throwable exists. - * - * @return true if a Throwable exists - */ - boolean hasThrowable(); - - /** - * Return last throwable if exists. - * - * @return last throwable - */ - Throwable getThrowable(); - - /** - * Shutdown the client. - * @throws IOException if shutdown fails - */ - void shutdown() throws IOException; } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 7e12592..49544a7 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -2,148 +2,69 @@ package org.xbib.elx.api; import java.net.MalformedURLException; import java.net.URL; +import java.util.concurrent.TimeUnit; -public class IndexDefinition { +public interface IndexDefinition { - private String index; + IndexDefinition setIndex(String index); - private String fullIndexName; + String getIndex(); - private String dateTimePattern; + IndexDefinition setFullIndexName(String fullIndexName); - private URL settingsUrl; + String getFullIndexName(); - private URL mappingsUrl; + IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException; - private boolean enabled; + IndexDefinition setSettingsUrl(URL settingsUrl); - private boolean ignoreErrors; + URL getSettingsUrl(); - private boolean switchAliases; + IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException; - private boolean hasForceMerge; + IndexDefinition setMappingsUrl(URL mappingsUrl); - private int replicaLevel; + URL getMappingsUrl(); - private IndexRetention indexRetention; + IndexDefinition setDateTimePattern(String timeWindow); - private String maxWaitTime; + String getDateTimePattern(); - public IndexDefinition setIndex(String index) { - this.index = index; - return this; - } + IndexDefinition setEnabled(boolean enabled); - public String getIndex() { - return index; - } + boolean isEnabled(); - public IndexDefinition setFullIndexName(String fullIndexName) { - this.fullIndexName = fullIndexName; - return this; - } + IndexDefinition setIgnoreErrors(boolean ignoreErrors); - public String getFullIndexName() { - return fullIndexName; - } + boolean ignoreErrors(); - public IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException { - this.settingsUrl = settingsUrlString != null ? new URL(settingsUrlString) : null; - return this; - } + IndexDefinition setShift(boolean shift); - public IndexDefinition setSettingsUrl(URL settingsUrl) { - this.settingsUrl = settingsUrl; - return this; - } + boolean isShiftEnabled(); - public URL getSettingsUrl() { - return settingsUrl; - } + IndexDefinition setForceMerge(boolean hasForceMerge); - public IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException { - this.mappingsUrl = mappingsUrlString != null ? new URL(mappingsUrlString) : null; - return this; - } + boolean hasForceMerge(); - public IndexDefinition setMappingsUrl(URL mappingsUrl) { - this.mappingsUrl = mappingsUrl; - return this; - } + IndexDefinition setReplicaLevel(int replicaLevel); - public URL getMappingsUrl() { - return mappingsUrl; - } + int getReplicaLevel(); - public IndexDefinition setDateTimePattern(String timeWindow) { - this.dateTimePattern = timeWindow; - return this; - } + IndexDefinition setRetention(IndexRetention indexRetention); - public String getDateTimePattern() { - return dateTimePattern; - } + IndexRetention getRetention(); - public IndexDefinition setEnabled(boolean enabled) { - this.enabled = enabled; - return this; - } + IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit); - public boolean isEnabled() { - return enabled; - } + long getMaxWaitTime(); - public IndexDefinition setIgnoreErrors(boolean ignoreErrors) { - this.ignoreErrors = ignoreErrors; - return this; - } + TimeUnit getMaxWaitTimeUnit(); - public boolean ignoreErrors() { - return ignoreErrors; - } + IndexDefinition setStartRefreshInterval(long seconds); - public IndexDefinition setSwitchAliases(boolean switchAliases) { - this.switchAliases = switchAliases; - return this; - } + long getStartRefreshInterval(); - public boolean isSwitchAliases() { - return switchAliases; - } + IndexDefinition setStopRefreshInterval(long seconds); - public IndexDefinition setForceMerge(boolean hasForceMerge) { - this.hasForceMerge = hasForceMerge; - return this; - } - - public boolean hasForceMerge() { - return hasForceMerge; - } - - public IndexDefinition setReplicaLevel(int replicaLevel) { - this.replicaLevel = replicaLevel; - return this; - } - - public int getReplicaLevel() { - return replicaLevel; - } - - public IndexDefinition setRetention(IndexRetention indexRetention) { - this.indexRetention = indexRetention; - return this; - } - - public IndexRetention getRetention() { - return indexRetention; - } - - public IndexDefinition setMaxWaitTime(String maxWaitTime) { - this.maxWaitTime = maxWaitTime; - return this; - } - - public String getMaxWaitTime() { - return maxWaitTime; - } + long getStopRefreshInterval(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java new file mode 100644 index 0000000..0c118f8 --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexPruneResult.java @@ -0,0 +1,16 @@ +package org.xbib.elx.api; + +import java.util.List; + +public interface IndexPruneResult { + + enum State { NOTHING_TO_DO, SUCCESS, NONE }; + + State getState(); + + List getCandidateIndices(); + + List getDeletedIndices(); + + boolean isAcknowledged(); +} diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java b/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java index e1445fc..44116e2 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java @@ -1,27 +1,13 @@ package org.xbib.elx.api; -public class IndexRetention { +public interface IndexRetention { - private int timestampDiff; + IndexRetention setDelta(int delta); - private int minToKeep; + int getDelta(); - public IndexRetention setDelta(int timestampDiff) { - this.timestampDiff = timestampDiff; - return this; - } + IndexRetention setMinToKeep(int minToKeep); - public int getDelta() { - return timestampDiff; - } - - public IndexRetention setMinToKeep(int minToKeep) { - this.minToKeep = minToKeep; - return this; - } - - public int getMinToKeep() { - return minToKeep; - } + int getMinToKeep(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexShiftResult.java b/elx-api/src/main/java/org/xbib/elx/api/IndexShiftResult.java new file mode 100644 index 0000000..02a2e8c --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexShiftResult.java @@ -0,0 +1,10 @@ +package org.xbib.elx.api; + +import java.util.List; + +public interface IndexShiftResult { + + List getMovedAliases(); + + List getNewAliases(); +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java index e9a2230..21f2730 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java @@ -4,7 +4,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; @@ -44,16 +43,12 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -62,7 +57,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilder; @@ -71,12 +65,14 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.xbib.elx.api.BulkControl; +import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.ExtendedClient; import org.xbib.elx.api.IndexAliasAdder; import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexRetention; +import org.xbib.elx.api.IndexShiftResult; import java.io.IOException; import java.io.InputStream; @@ -99,6 +95,7 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -120,27 +117,58 @@ public abstract class AbstractExtendedClient implements ExtendedClient { */ private ElasticsearchClient client; - /** - * Our replacement for the buk processor. - */ - private BulkProcessor bulkProcessor; - private BulkMetric bulkMetric; - private BulkControl bulkControl; + private BulkController bulkController; - private Throwable throwable; + private AtomicBoolean closed; - private boolean closed; + private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() { + @Override + public List getMovedAliases() { + return Collections.emptyList(); + } + + @Override + public List getNewAliases() { + return Collections.emptyList(); + } + }; + + private static final IndexPruneResult EMPTY_INDEX_PRUNE_RESULT = new IndexPruneResult() { + @Override + public State getState() { + return State.NONE; + } + + @Override + public List getCandidateIndices() { + return Collections.emptyList(); + } + + @Override + public List getDeletedIndices() { + return Collections.emptyList(); + } + + @Override + public boolean isAcknowledged() { + return false; + } + }; protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; protected AbstractExtendedClient() { + closed = new AtomicBoolean(false); } @Override public AbstractExtendedClient setClient(ElasticsearchClient client) { this.client = client; + this.bulkMetric = new DefaultBulkMetric(); + bulkMetric.start(); + this.bulkController = new DefaultBulkController(this, bulkMetric); return this; } @@ -149,28 +177,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient { return client; } - @Override - public AbstractExtendedClient setBulkMetric(BulkMetric metric) { - this.bulkMetric = metric; - // you must start bulk metric or it will bail out at stop() - bulkMetric.start(); - return this; - } - @Override public BulkMetric getBulkMetric() { return bulkMetric; } @Override - public AbstractExtendedClient setBulkControl(BulkControl bulkControl) { - this.bulkControl = bulkControl; - return this; - } - - @Override - public BulkControl getBulkControl() { - return bulkControl; + public BulkController getBulkController() { + return bulkController; } @Override @@ -181,120 +195,33 @@ public abstract class AbstractExtendedClient implements ExtendedClient { if (bulkMetric != null) { bulkMetric.start(); } - BulkProcessor.Listener listener = new BulkProcessor.Listener() { - - private final Logger logger = LogManager.getLogger("org.xbib.elx.BulkProcessor.Listener"); - - @Override - public void beforeBulk(long executionId, BulkRequest request) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().inc(); - int n = request.numberOfActions(); - bulkMetric.getSubmitted().inc(n); - bulkMetric.getCurrentIngestNumDocs().inc(n); - bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); - } - logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", - executionId, - request.numberOfActions(), - request.estimatedSizeInBytes(), - l); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().dec(); - bulkMetric.getSucceeded().inc(response.getItems().length); - } - int n = 0; - for (BulkItemResponse itemResponse : response.getItems()) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); - } - if (itemResponse.isFailed()) { - n++; - if (bulkMetric != null) { - bulkMetric.getSucceeded().dec(1); - bulkMetric.getFailed().inc(1); - } - } - } - if (bulkMetric != null) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", - executionId, - bulkMetric.getSucceeded().getCount(), - bulkMetric.getFailed().getCount(), - response.getTook().millis(), - l); - } - if (n > 0) { - logger.error("bulk [{}] failed with {} failed items, failure message = {}", - executionId, n, response.buildFailureMessage()); - } else { - if (bulkMetric != null) { - bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); - } - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(); - } - throwable = failure; - closed = true; - logger.error("after bulk [" + executionId + "] error", failure); - } - }; - if (client != null) { - int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), - Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); - int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), - Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum()); - TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), - TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum())); - ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), - ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), - "maxVolumePerRequest")); - logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + - "flushIngestInterval = {} maxVolumePerRequest = {}", - maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest); - BulkProcessor.Builder builder = BulkProcessor.builder((Client) client, listener) - .setBulkActions(maxActionsPerRequest) - .setConcurrentRequests(maxConcurrentRequests) - .setFlushInterval(flushIngestInterval) - .setBulkSize(maxVolumePerRequest); - this.bulkProcessor = builder.build(); - } - this.closed = false; - this.throwable = null; + if (bulkController != null) { + bulkController.init(settings); + } return this; } @Override - public synchronized void shutdown() throws IOException { - ensureActive(); - if (bulkProcessor != null) { - logger.info("closing bulk processor"); - bulkProcessor.close(); - } - if (bulkMetric != null) { - logger.info("stopping metric before bulk stop (for precise measurement)"); - bulkMetric.stop(); + public void flush() throws IOException { + if (bulkController != null) { + bulkController.flush(); } - if (bulkControl != null && bulkControl.indices() != null && !bulkControl.indices().isEmpty()) { - logger.info("stopping bulk mode for indices {}...", bulkControl.indices()); - for (String index : bulkControl.indices()) { - stopBulk(index, bulkControl.getMaxWaitTime()); + } + + @Override + public void close() throws IOException { + ensureActive(); + if (closed.compareAndSet(false, true)) { + if (bulkMetric != null) { + logger.info("closing bulk metric before bulk controller (for precise measurement)"); + bulkMetric.close(); } + if (bulkController != null) { + logger.info("closing bulk controller"); + bulkController.close(); + } + logger.info("shutdown complete"); } - logger.info("shutdown complete"); } @Override @@ -320,7 +247,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException { ensureActive(); - waitForCluster("YELLOW", "30s"); + waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); URL indexSettings = indexDefinition.getSettingsUrl(); if (indexSettings == null) { logger.warn("warning while creating index '{}', no settings/mappings", @@ -417,37 +344,27 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException { - ensureActive(); - if (bulkControl == null) { - return this; - } - if (!bulkControl.isBulk(index) && startRefreshIntervalSeconds > 0L && stopRefreshIntervalSeconds > 0L) { - bulkControl.startBulk(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); - updateIndexSetting(index, "refresh_interval", startRefreshIntervalSeconds + "s"); + if (bulkController != null) { + ensureActive(); + bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); } return this; } @Override public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException { - return stopBulk(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime()); + if (bulkController != null) { + ensureActive(); + bulkController.stopBulkMode(indexDefinition); + } + return this; } @Override - public ExtendedClient stopBulk(String index, String maxWaitTime) throws IOException { - ensureActive(); - if (bulkControl == null) { - return this; - } - flushIngest(); - if (waitForResponses(maxWaitTime)) { - if (bulkControl.isBulk(index)) { - long secs = bulkControl.getStopBulkRefreshIntervals().get(index); - if (secs > 0L) { - updateIndexSetting(index, "refresh_interval", secs + "s"); - } - bulkControl.finishBulk(index); - } + public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException { + if (bulkController != null) { + ensureActive(); + bulkController.stopBulkMode(index, timeout, timeUnit); } return this; } @@ -465,16 +382,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient index(IndexRequest indexRequest) { ensureActive(); - try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); - } - bulkProcessor.add(indexRequest); - } catch (Exception e) { - throwable = e; - closed = true; - logger.error("bulk add of index request failed: " + e.getMessage(), e); - } + bulkController.index(indexRequest); return this; } @@ -486,16 +394,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient delete(DeleteRequest deleteRequest) { ensureActive(); - try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); - } - bulkProcessor.add(deleteRequest); - } catch (Exception e) { - throwable = e; - closed = true; - logger.error("bulk add of delete failed: " + e.getMessage(), e); - } + bulkController.delete(deleteRequest); return this; } @@ -512,49 +411,23 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient update(UpdateRequest updateRequest) { ensureActive(); - try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); - } - bulkProcessor.add(updateRequest); - } catch (Exception e) { - throwable = e; - closed = true; - logger.error("bulk add of update request failed: " + e.getMessage(), e); - } + bulkController.update(updateRequest); return this; } @Override - public ExtendedClient flushIngest() { + public boolean waitForResponses(long timeout, TimeUnit timeUnit) { ensureActive(); - logger.debug("flushing bulk processor"); - bulkProcessor.flush(); - return this; + return bulkController.waitForResponses(timeout, timeUnit); } @Override - public boolean waitForResponses(String maxWaitTime) { - ensureActive(); - long millis = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueMinutes(1),"millis").getMillis(); - logger.debug("waiting for " + millis + " millis"); - try { - return bulkProcessor.awaitFlush(millis, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("interrupted"); - return false; - } - } - - @Override - public boolean waitForRecovery(String index, String maxWaitTime) { + public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) { ensureActive(); ensureIndexGiven(index); RecoveryResponse response = client.execute(RecoveryAction.INSTANCE, new RecoveryRequest(index)).actionGet(); int shards = response.getTotalShards(); - TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), - getClass().getSimpleName() + ".timeout"); + TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest(index) .waitForActiveShards(shards).timeout(timeout)).actionGet(); @@ -566,26 +439,26 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public boolean waitForCluster(String statusString, String maxWaitTime) { + public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { ensureActive(); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); - TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), - getClass().getSimpleName() + ".timeout"); + TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { - logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name()); + if (logger.isErrorEnabled()) { + logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name()); + } return false; } return true; } @Override - public String getHealthColor(String maxWaitTime) { + public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { ensureActive(); try { - TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), - getClass().getSimpleName() + ".timeout"); + TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().timeout(timeout)).actionGet(); ClusterHealthStatus status = healthResponse.getStatus(); @@ -604,15 +477,16 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { - return updateReplicaLevel(indexDefinition.getFullIndexName(), level, indexDefinition.getMaxWaitTime()); + return updateReplicaLevel(indexDefinition.getFullIndexName(), level, + indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); } @Override - public ExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) throws IOException { - waitForCluster("YELLOW", maxWaitTime); // let cluster settle down from critical operations + public ExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException { + waitForCluster("YELLOW", maxWaitTime, timeUnit); // let cluster settle down from critical operations if (level > 0) { updateIndexSetting(index, "number_of_replicas", level); - waitForRecovery(index, maxWaitTime); + waitForRecovery(index, maxWaitTime, timeUnit); } return this; } @@ -684,12 +558,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient { return indices.isEmpty() ? alias : indices.iterator().next(); } - @Override - public Map getAliasFilters(String alias) { - GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE); - return getFilters(getAliasesRequestBuilder.setIndices(resolveAlias(alias)).execute().actionGet()); - } - @Override public Map getIndexFilters(String index) { GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE); @@ -697,50 +565,49 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public ExtendedClient switchIndex(IndexDefinition indexDefinition, List extraAliases) { - return switchIndex(indexDefinition, extraAliases, null); + public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases) { + return shiftIndex(indexDefinition, additionalAliases, null); } - @Override - public ExtendedClient switchIndex(IndexDefinition indexDefinition, - List extraAliases, IndexAliasAdder indexAliasAdder) { - if (extraAliases == null) { - return this; - } - if (indexDefinition.isSwitchAliases()) { - switchIndex(indexDefinition.getIndex(), - indexDefinition.getFullIndexName(), extraAliases.stream() + public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, + List additionalAliases, IndexAliasAdder indexAliasAdder) { + if (additionalAliases == null) { + return EMPTY_INDEX_SHIFT_RESULT; + } + if (indexDefinition.isShiftEnabled()) { + return shiftIndex(indexDefinition.getIndex(), + indexDefinition.getFullIndexName(), additionalAliases.stream() .filter(a -> a != null && !a.isEmpty()) .collect(Collectors.toList()), indexAliasAdder); } - return this; + return EMPTY_INDEX_SHIFT_RESULT; } @Override - public ExtendedClient switchIndex(String index, String fullIndexName, List extraAliases) { - return switchIndex(index, fullIndexName, extraAliases, null); + public IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases) { + return shiftIndex(index, fullIndexName, additionalAliases, null); } @Override - public ExtendedClient switchIndex(String index, String fullIndexName, - List extraAliases, IndexAliasAdder adder) { + public IndexShiftResult shiftIndex(String index, String fullIndexName, + List additionalAliases, IndexAliasAdder adder) { ensureActive(); if (index.equals(fullIndexName)) { - return this; // nothing to switch to + return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to } // two situations: 1. there is a new alias 2. there is already an old index with the alias String oldIndex = resolveAlias(index); final Map oldFilterMap = oldIndex.equals(index) ? null : getIndexFilters(oldIndex); final List newAliases = new LinkedList<>(); - final List switchAliases = new LinkedList<>(); + final List moveAliases = new LinkedList<>(); IndicesAliasesRequestBuilder requestBuilder = new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE); if (oldFilterMap == null || !oldFilterMap.containsKey(index)) { // never apply a filter for trunk index name requestBuilder.addAlias(fullIndexName, index); newAliases.add(index); } - // switch existing aliases + // move existing aliases if (oldFilterMap != null) { for (Map.Entry entry : oldFilterMap.entrySet()) { String alias = entry.getKey(); @@ -751,12 +618,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } else { requestBuilder.addAlias(fullIndexName, alias); } - switchAliases.add(alias); + moveAliases.add(alias); } } // a list of aliases that should be added, check if new or old - if (extraAliases != null) { - for (String extraAlias : extraAliases) { + if (additionalAliases != null) { + for (String extraAlias : additionalAliases) { if (oldFilterMap == null || !oldFilterMap.containsKey(extraAlias)) { // index alias adder only active on extra aliases, and if alias is new if (adder != null) { @@ -773,82 +640,72 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } else { requestBuilder.addAlias(fullIndexName, extraAlias); } - switchAliases.add(extraAlias); + moveAliases.add(extraAlias); } } } - if (!newAliases.isEmpty() || !switchAliases.isEmpty()) { - logger.info("new aliases = {}, switch aliases = {}", newAliases, switchAliases); + if (!newAliases.isEmpty() || !moveAliases.isEmpty()) { + logger.info("new aliases = {}, moved aliases = {}", newAliases, moveAliases); requestBuilder.execute().actionGet(); } - return this; + return new SuccessIndexShiftResult(moveAliases, newAliases); } @Override - public void pruneIndex(IndexDefinition indexDefinition) { - pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), - indexDefinition.getRetention().getDelta(), indexDefinition.getRetention().getMinToKeep()); + public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { + return pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), + indexDefinition.getRetention().getDelta(), indexDefinition.getRetention().getMinToKeep(), true); } @Override - public void pruneIndex(String index, String fullIndexName, int delta, int mintokeep) { + public IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform) { if (delta == 0 && mintokeep == 0) { - return; + return EMPTY_INDEX_PRUNE_RESULT; } - ensureActive(); if (index.equals(fullIndexName)) { - return; + return EMPTY_INDEX_PRUNE_RESULT; } + ensureActive(); GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); - Set indices = new TreeSet<>(); logger.info("{} indices", getIndexResponse.getIndices().length); + List candidateIndices = new ArrayList<>(); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); if (m.matches() && index.equals(m.group(1)) && !s.equals(fullIndexName)) { - indices.add(s); + candidateIndices.add(s); } } - if (indices.isEmpty()) { - logger.info("no indices found, retention policy skipped"); - return; + if (candidateIndices.isEmpty()) { + return EMPTY_INDEX_PRUNE_RESULT; } - if (mintokeep > 0 && indices.size() <= mintokeep) { - logger.info("{} indices found, not enough for retention policy ({}), skipped", - indices.size(), mintokeep); - return; - } else { - logger.info("candidates for deletion = {}", indices); + if (mintokeep > 0 && candidateIndices.size() <= mintokeep) { + return new NothingToDoPruneResult(candidateIndices, Collections.emptyList()); } List indicesToDelete = new ArrayList<>(); - // our index Matcher m1 = pattern.matcher(fullIndexName); if (m1.matches()) { Integer i1 = Integer.parseInt(m1.group(2)); - for (String s : indices) { + for (String s : candidateIndices) { Matcher m2 = pattern.matcher(s); if (m2.matches()) { Integer i2 = Integer.parseInt(m2.group(2)); - int kept = indices.size() - indicesToDelete.size(); + int kept = candidateIndices.size() - indicesToDelete.size(); if ((delta == 0 || (delta > 0 && i1 - i2 > delta)) && mintokeep <= kept) { indicesToDelete.add(s); } } } } - logger.info("indices to delete = {}", indicesToDelete); if (indicesToDelete.isEmpty()) { - logger.info("not enough indices found to delete, retention policy complete"); - return; + return new NothingToDoPruneResult(candidateIndices, indicesToDelete); } String[] s = new String[indicesToDelete.size()]; DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest() .indices(indicesToDelete.toArray(s)); DeleteIndexResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); - if (!response.isAcknowledged()) { - logger.warn("retention delete index operation was not acknowledged"); - } + return new SuccessPruneResult(candidateIndices, indicesToDelete, response); } @Override @@ -875,15 +732,15 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public boolean forceMerge(IndexDefinition indexDefinition) { if (indexDefinition.hasForceMerge()) { - return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime()); + return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime(), + indexDefinition.getMaxWaitTimeUnit()); } return false; } @Override - public boolean forceMerge(String index, String maxWaitTime) { - TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10), - getClass().getSimpleName() + ".timeout"); + public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) { + TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ForceMergeRequestBuilder forceMergeRequestBuilder = new ForceMergeRequestBuilder(client, ForceMergeAction.INSTANCE); forceMergeRequestBuilder.setIndices(index); @@ -909,44 +766,35 @@ public abstract class AbstractExtendedClient implements ExtendedClient { String fullIndexName; String dateTimePattern = settings.get("dateTimePattern"); if (dateTimePattern != null) { - fullIndexName = resolveAlias(indexName + - DateTimeFormatter.ofPattern(dateTimePattern) + // check if index name with current date already exists, resolve to it + fullIndexName = resolveAlias(indexName + DateTimeFormatter.ofPattern(dateTimePattern) .withZone(ZoneId.systemDefault()) // not GMT .format(LocalDate.now())); - logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName); } else { + // check if index name already exists, resolve to it fullIndexName = resolveMostRecentIndex(indexName); - logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName); } - IndexRetention indexRetention = new IndexRetention() + IndexRetention indexRetention = new DefaultIndexRetention() .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) .setDelta(settings.getAsInt("retention.delta", 0)); - - return new IndexDefinition() + return new DefaultIndexDefinition() + .setEnabled(isEnabled) .setIndex(indexName) .setFullIndexName(fullIndexName) .setSettingsUrl(settings.get("settings")) .setMappingsUrl(settings.get("mapping")) .setDateTimePattern(dateTimePattern) - .setEnabled(isEnabled) .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) - .setSwitchAliases(settings.getAsBoolean("aliases", true)) + .setShift(settings.getAsBoolean("shift", true)) .setReplicaLevel(settings.getAsInt("replica", 0)) - .setMaxWaitTime(settings.get("timout", "30s")) - .setRetention(indexRetention); + .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) + .setRetention(indexRetention) + .setStartRefreshInterval(settings.getAsLong("bulk.startrefreshinterval", -1L)) + .setStopRefreshInterval(settings.getAsLong("bulk.stoprefreshinterval", -1L)); } @Override - public boolean hasThrowable() { - return throwable != null; - } - - @Override - public Throwable getThrowable() { - return throwable; - } - - private void updateIndexSetting(String index, String key, Object value) throws IOException { + public void updateIndexSetting(String index, String key, Object value) throws IOException { ensureActive(); if (index == null) { throw new IOException("no index name given"); @@ -971,9 +819,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient { if (client == null) { throw new IllegalStateException("no client"); } - if (closed) { - throw new ElasticsearchException("client is closed"); - } } private void ensureIndexGiven(String index) { @@ -1096,4 +941,115 @@ public abstract class AbstractExtendedClient implements ExtendedClient { .forEachOrdered(e -> result.put(e.getKey(), e.getValue())); return result; } + + private static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) { + switch (timeUnit) { + case DAYS: + return TimeValue.timeValueHours(24 * timeValue); + case HOURS: + return TimeValue.timeValueHours(timeValue); + case MINUTES: + return TimeValue.timeValueMinutes(timeValue); + case SECONDS: + return TimeValue.timeValueSeconds(timeValue); + case MILLISECONDS: + return TimeValue.timeValueMillis(timeValue); + case MICROSECONDS: + return TimeValue.timeValueNanos(1000 * timeValue); + case NANOSECONDS: + return TimeValue.timeValueNanos(timeValue); + default: + throw new IllegalArgumentException("unknown time unit: " + timeUnit); + } + } + + private static class SuccessIndexShiftResult implements IndexShiftResult { + + List movedAliases; + + List newAliases; + + SuccessIndexShiftResult(List movedAliases, List newAliases) { + this.movedAliases = movedAliases; + this.newAliases = newAliases; + } + + @Override + public List getMovedAliases() { + return movedAliases; + } + + @Override + public List getNewAliases() { + return newAliases; + } + } + + private static class SuccessPruneResult implements IndexPruneResult { + + List candidateIndices; + + List indicesToDelete; + + DeleteIndexResponse response; + + SuccessPruneResult(List candidateIndices, List indicesToDelete, + DeleteIndexResponse response) { + this.candidateIndices = candidateIndices; + this.indicesToDelete = indicesToDelete; + this.response = response; + } + + @Override + public IndexPruneResult.State getState() { + return IndexPruneResult.State.SUCCESS; + } + + @Override + public List getCandidateIndices() { + return candidateIndices; + } + + @Override + public List getDeletedIndices() { + return indicesToDelete; + } + + @Override + public boolean isAcknowledged() { + return response.isAcknowledged(); + } + } + + private static class NothingToDoPruneResult implements IndexPruneResult { + + List candidateIndices; + + List indicesToDelete; + + NothingToDoPruneResult(List candidateIndices, List indicesToDelete) { + this.candidateIndices = candidateIndices; + this.indicesToDelete = indicesToDelete; + } + + @Override + public IndexPruneResult.State getState() { + return IndexPruneResult.State.SUCCESS; + } + + @Override + public List getCandidateIndices() { + return candidateIndices; + } + + @Override + public List getDeletedIndices() { + return indicesToDelete; + } + + @Override + public boolean isAcknowledged() { + return false; + } + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index cc15697..ba9150f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -5,8 +5,6 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.xbib.elx.api.BulkControl; -import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.ExtendedClient; import org.xbib.elx.api.ExtendedClientProvider; @@ -26,10 +24,6 @@ public class ClientBuilder { private Class provider; - private BulkMetric metric; - - private BulkControl control; - public ClientBuilder() { this(null); } @@ -48,8 +42,6 @@ public class ClientBuilder { for (ExtendedClientProvider provider : serviceLoader) { providerMap.put(provider.getClass(), provider); } - this.metric = new SimpleBulkMetric(); - this.control = new SimpleBulkControl(); } public static ClientBuilder builder() { @@ -100,25 +92,11 @@ public class ClientBuilder { return this; } - public ClientBuilder setMetric(BulkMetric metric) { - this.metric = metric; - return this; - } - - public ClientBuilder setControl(BulkControl control) { - this.control = control; - return this; - } - @SuppressWarnings("unchecked") public C build() throws IOException { if (provider == null) { throw new IllegalArgumentException("no provider"); } - return (C) providerMap.get(provider).getExtendedClient() - .setClient(client) - .setBulkMetric(metric) - .setBulkControl(control) - .init(settingsBuilder.build()); + return (C) providerMap.get(provider).getExtendedClient().setClient(client).init(settingsBuilder.build()); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java new file mode 100644 index 0000000..ca705c4 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -0,0 +1,309 @@ +package org.xbib.elx.common; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.xbib.elx.api.BulkController; +import org.xbib.elx.api.BulkMetric; +import org.xbib.elx.api.BulkProcessor; +import org.xbib.elx.api.ExtendedClient; +import org.xbib.elx.api.IndexDefinition; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DefaultBulkController implements BulkController { + + private static final Logger logger = LogManager.getLogger(DefaultBulkController.class); + + private final ExtendedClient client; + + private final BulkMetric bulkMetric; + + private final List indexNames; + + private final Map startBulkRefreshIntervals; + + private final Map stopBulkRefreshIntervals; + + private long maxWaitTime; + + private TimeUnit maxWaitTimeUnit; + + private BulkProcessor bulkProcessor; + + private BulkListener bulkListener; + + private AtomicBoolean active; + + public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) { + this.client = client; + this.bulkMetric = bulkMetric; + this.indexNames = new ArrayList<>(); + this.active = new AtomicBoolean(false); + this.startBulkRefreshIntervals = new HashMap<>(); + this.stopBulkRefreshIntervals = new HashMap<>(); + this.maxWaitTime = 30L; + this.maxWaitTimeUnit = TimeUnit.SECONDS; + } + + @Override + public Throwable getLastBulkError() { + return bulkListener.getLastBulkError(); + } + + @Override + public void init(Settings settings) { + int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), + Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); + int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), + Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum()); + TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), + TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum())); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), + ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), + "maxVolumePerRequest")); + if (logger.isInfoEnabled()) { + logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + + "flushIngestInterval = {} maxVolumePerRequest = {}", + maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest); + } + this.bulkListener = new BulkListener(); + DefaultBulkProcessor.Builder builder = DefaultBulkProcessor.builder((Client) client.getClient(), bulkListener) + .setBulkActions(maxActionsPerRequest) + .setConcurrentRequests(maxConcurrentRequests) + .setFlushInterval(flushIngestInterval) + .setBulkSize(maxVolumePerRequest); + this.bulkProcessor = builder.build(); + this.active.set(true); + } + + @Override + public void startBulkMode(IndexDefinition indexDefinition) throws IOException { + startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(), + indexDefinition.getStopRefreshInterval()); + } + + @Override + public void startBulkMode(String indexName, + long startRefreshIntervalInSeconds, + long stopRefreshIntervalInSeconds) throws IOException { + if (!indexNames.contains(indexName)) { + indexNames.add(indexName); + startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds); + stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds); + if (startRefreshIntervalInSeconds != 0L) { + client.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s"); + } + } + } + + @Override + public void index(IndexRequest indexRequest) { + if (!active.get()) { + throw new IllegalStateException("inactive"); + } + try { + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); + } + bulkProcessor.add(indexRequest); + } catch (Exception e) { + bulkListener.lastBulkError = e; + active.set(false); + if (logger.isErrorEnabled()) { + logger.error("bulk add of index failed: " + e.getMessage(), e); + } + } + } + + @Override + public void delete(DeleteRequest deleteRequest) { + if (!active.get()) { + throw new IllegalStateException("inactive"); + } + try { + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); + } + bulkProcessor.add(deleteRequest); + } catch (Exception e) { + bulkListener.lastBulkError = e; + active.set(false); + if (logger.isErrorEnabled()) { + logger.error("bulk add of delete failed: " + e.getMessage(), e); + } + } + } + + @Override + public void update(UpdateRequest updateRequest) { + if (!active.get()) { + throw new IllegalStateException("inactive"); + } + try { + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); + } + bulkProcessor.add(updateRequest); + } catch (Exception e) { + bulkListener.lastBulkError = e; + active.set(false); + if (logger.isErrorEnabled()) { + logger.error("bulk add of update failed: " + e.getMessage(), e); + } + } + } + + @Override + public boolean waitForResponses(long timeout, TimeUnit timeUnit) { + try { + return bulkProcessor.awaitFlush(timeout, timeUnit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted"); + return false; + } + } + + @Override + public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { + stopBulkMode(indexDefinition.getFullIndexName(), + indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); + } + + @Override + public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException { + flush(); + if (waitForResponses(timeout, timeUnit)) { + if (indexNames.contains(index)) { + Long secs = stopBulkRefreshIntervals.get(index); + if (secs != null && secs != 0L) { + client.updateIndexSetting(index, "refresh_interval", secs + "s"); + } + indexNames.remove(index); + } + } + } + + @Override + public void flush() throws IOException { + if (bulkProcessor != null) { + bulkProcessor.flush(); + } + } + + @Override + public void close() throws IOException { + flush(); + if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) { + for (String index : indexNames) { + Long secs = stopBulkRefreshIntervals.get(index); + if (secs != null && secs != 0L) + client.updateIndexSetting(index, "refresh_interval", secs + "s"); + } + indexNames.clear(); + } + if (bulkProcessor != null) { + bulkProcessor.close(); + } + } + + private class BulkListener implements DefaultBulkProcessor.Listener { + + private final Logger logger = LogManager.getLogger("org.xbib.elx.BulkProcessor.Listener"); + + private Throwable lastBulkError = null; + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + long l = 0; + if (bulkMetric != null) { + l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().inc(); + int n = request.numberOfActions(); + bulkMetric.getSubmitted().inc(n); + bulkMetric.getCurrentIngestNumDocs().inc(n); + bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); + } + if (logger.isDebugEnabled()) { + logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", + executionId, + request.numberOfActions(), + request.estimatedSizeInBytes(), + l); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + long l = 0; + if (bulkMetric != null) { + l = bulkMetric.getCurrentIngest().getCount(); + bulkMetric.getCurrentIngest().dec(); + bulkMetric.getSucceeded().inc(response.getItems().length); + } + int n = 0; + for (BulkItemResponse itemResponse : response.getItems()) { + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); + } + if (itemResponse.isFailed()) { + n++; + if (bulkMetric != null) { + bulkMetric.getSucceeded().dec(1); + bulkMetric.getFailed().inc(1); + } + } + } + if (bulkMetric != null && logger.isDebugEnabled()) { + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", + executionId, + bulkMetric.getSucceeded().getCount(), + bulkMetric.getFailed().getCount(), + response.getTook().millis(), + l); + } + if (n > 0) { + if (logger.isErrorEnabled()) { + logger.error("bulk [{}] failed with {} failed items, failure message = {}", + executionId, n, response.buildFailureMessage()); + } + } else { + if (bulkMetric != null) { + bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); + } + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + if (bulkMetric != null) { + bulkMetric.getCurrentIngest().dec(); + } + lastBulkError = failure; + active.set(false); + if (logger.isErrorEnabled()) { + logger.error("after bulk [" + executionId + "] error", failure); + } + } + + Throwable getLastBulkError() { + return lastBulkError; + } + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java similarity index 78% rename from elx-common/src/main/java/org/xbib/elx/common/SimpleBulkMetric.java rename to elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 1a9dd35..a956c4d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -1,5 +1,6 @@ package org.xbib.elx.common; +import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.BulkMetric; import org.xbib.metrics.Count; import org.xbib.metrics.CountMetric; @@ -7,9 +8,8 @@ import org.xbib.metrics.Meter; import org.xbib.metrics.Metered; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -public class SimpleBulkMetric implements BulkMetric { +public class DefaultBulkMetric implements BulkMetric { private final Meter totalIngest; @@ -29,12 +29,8 @@ public class SimpleBulkMetric implements BulkMetric { private Long stopped; - public SimpleBulkMetric() { - this(Executors.newSingleThreadScheduledExecutor()); - } - - public SimpleBulkMetric(ScheduledExecutorService executorService) { - totalIngest = new Meter(executorService); + public DefaultBulkMetric() { + totalIngest = new Meter(Executors.newSingleThreadScheduledExecutor()); totalIngestSizeInBytes = new CountMetric(); currentIngest = new CountMetric(); currentIngestNumDocs = new CountMetric(); @@ -43,6 +39,11 @@ public class SimpleBulkMetric implements BulkMetric { failed = new CountMetric(); } + @Override + public void init(Settings settings) { + start(); + } + @Override public Metered getTotalIngest() { return totalIngest; @@ -78,6 +79,11 @@ public class SimpleBulkMetric implements BulkMetric { return failed; } + @Override + public long elapsed() { + return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L; + } + @Override public void start() { this.started = System.nanoTime(); @@ -91,8 +97,8 @@ public class SimpleBulkMetric implements BulkMetric { } @Override - public long elapsed() { - return (stopped != null ? stopped : System.nanoTime()) - started; + public void close() { + stop(); + totalIngest.shutdown(); } - } diff --git a/elx-common/src/main/java/org/xbib/elx/common/BulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java similarity index 80% rename from elx-common/src/main/java/org/xbib/elx/common/BulkProcessor.java rename to elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 1aeeed4..28dbb45 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/BulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -5,16 +5,14 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.xbib.elx.api.BulkProcessor; -import java.io.Closeable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -28,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong; * requests allowed to be executed in parallel. * In order to create a new bulk processor, use the {@link Builder}. */ -public class BulkProcessor implements Closeable { +public class DefaultBulkProcessor implements BulkProcessor { private final int bulkActions; @@ -46,8 +44,8 @@ public class BulkProcessor implements Closeable { private volatile boolean closed; - private BulkProcessor(Client client, Listener listener, String name, int concurrentRequests, - int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { + private DefaultBulkProcessor(Client client, Listener listener, String name, int concurrentRequests, + int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { this.executionIdGen = new AtomicLong(); this.closed = false; this.bulkActions = bulkActions; @@ -77,19 +75,6 @@ public class BulkProcessor implements Closeable { return new Builder(client, listener); } - /** - * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. - */ - @Override - public void close() { - try { - // 0 = immediate close - awaitClose(0, TimeUnit.NANOSECONDS); - } catch (InterruptedException exc) { - Thread.currentThread().interrupt(); - } - } - /** * Wait for bulk request handler with flush. * @param timeout the timeout value @@ -97,7 +82,8 @@ public class BulkProcessor implements Closeable { * @return true is method was successful, false if timeout * @throws InterruptedException if timeout */ - public boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException { + @Override + public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException { if (closed) { return true; } @@ -124,6 +110,7 @@ public class BulkProcessor implements Closeable { * bulk requests completed * @throws InterruptedException If the current thread is interrupted */ + @Override public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { if (closed) { return true; @@ -140,46 +127,51 @@ public class BulkProcessor implements Closeable { } /** - * Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest} - * (for example, if no id is provided, one will be generated, or usage of the create flag). + * Adds either a delete or an index request. * * @param request request * @return his bulk processor */ - public BulkProcessor add(IndexRequest request) { - return add((ActionRequest) request); + @Override + public DefaultBulkProcessor add(ActionRequest request) { + return add(request, null); } /** - * Adds an {@link DeleteRequest} to the list of actions to execute. + * Adds either a delete or an index request with a payload. * * @param request request + * @param payload payload * @return his bulk processor */ - public BulkProcessor add(DeleteRequest request) { - return add((ActionRequest) request); + @Override + public DefaultBulkProcessor add(ActionRequest request, Object payload) { + internalAdd(request, payload); + return this; } /** - * Adds either a delete or an index request. - * - * @param request request - * @return his bulk processor + * Flush pending delete or index requests. */ - public BulkProcessor add(ActionRequest request) { - return add(request, null); + @Override + public synchronized void flush() { + ensureOpen(); + if (bulkRequest.numberOfActions() > 0) { + execute(); + } } /** - * Adds either a delete or an index request with a payload. - * - * @param request request - * @param payload payload - * @return his bulk processor + * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. */ - public BulkProcessor add(ActionRequest request, Object payload) { - internalAdd(request, payload); - return this; + @Override + public void close() { + try { + // 0 = immediate close + awaitClose(0, TimeUnit.NANOSECONDS); + } catch (InterruptedException exc) { + Thread.currentThread().interrupt(); + } } private void ensureOpen() { @@ -213,53 +205,7 @@ public class BulkProcessor implements Closeable { return bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions || bulkSize != -1 && - bulkRequest.estimatedSizeInBytes() >= bulkSize; - } - - /** - * Flush pending delete or index requests. - */ - public synchronized void flush() { - ensureOpen(); - if (bulkRequest.numberOfActions() > 0) { - execute(); - } - } - - /** - * A listener for the execution. - */ - public interface Listener { - - /** - * Callback before the bulk is executed. - * - * @param executionId execution ID - * @param request request - */ - void beforeBulk(long executionId, BulkRequest request); - - /** - * Callback after a successful execution of bulk request. - * - * @param executionId execution ID - * @param request request - * @param response response - */ - void afterBulk(long executionId, BulkRequest request, BulkResponse response); - - /** - * Callback after a failed execution of bulk request. - * - * Note that in case an instance of InterruptedException is passed, which means that request - * processing has been - * cancelled externally, the thread's interruption status has been restored prior to calling this method. - * - * @param executionId execution ID - * @param request request - * @param failure failure - */ - void afterBulk(long executionId, BulkRequest request, Throwable failure); + bulkRequest.estimatedSizeInBytes() >= bulkSize; } /** @@ -359,8 +305,8 @@ public class BulkProcessor implements Closeable { * * @return a bulk processor */ - public BulkProcessor build() { - return new BulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval); + public DefaultBulkProcessor build() { + return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval); } } @@ -368,7 +314,7 @@ public class BulkProcessor implements Closeable { @Override public void run() { - synchronized (BulkProcessor.this) { + synchronized (DefaultBulkProcessor.this) { if (closed) { return; } @@ -380,24 +326,13 @@ public class BulkProcessor implements Closeable { } } - /** - * Abstracts the low-level details of bulk request handling. - */ - interface BulkRequestHandler { - - void execute(BulkRequest bulkRequest, long executionId); - - boolean close(long timeout, TimeUnit unit) throws InterruptedException; - - } - private static class SyncBulkRequestHandler implements BulkRequestHandler { private final Client client; - private final BulkProcessor.Listener listener; + private final DefaultBulkProcessor.Listener listener; - SyncBulkRequestHandler(Client client, BulkProcessor.Listener listener) { + SyncBulkRequestHandler(Client client, DefaultBulkProcessor.Listener listener) { this.client = client; this.listener = listener; } @@ -427,13 +362,13 @@ public class BulkProcessor implements Closeable { private final Client client; - private final BulkProcessor.Listener listener; + private final DefaultBulkProcessor.Listener listener; private final Semaphore semaphore; private final int concurrentRequests; - private AsyncBulkRequestHandler(Client client, BulkProcessor.Listener listener, int concurrentRequests) { + private AsyncBulkRequestHandler(Client client, DefaultBulkProcessor.Listener listener, int concurrentRequests) { this.client = client; this.listener = listener; this.concurrentRequests = concurrentRequests; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java new file mode 100644 index 0000000..52127e1 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -0,0 +1,214 @@ +package org.xbib.elx.common; + +import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.api.IndexRetention; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.TimeUnit; + +public class DefaultIndexDefinition implements IndexDefinition { + + private String index; + + private String fullIndexName; + + private String dateTimePattern; + + private URL settingsUrl; + + private URL mappingsUrl; + + private boolean enabled; + + private boolean ignoreErrors; + + private boolean switchAliases; + + private boolean hasForceMerge; + + private int replicaLevel; + + private IndexRetention indexRetention; + + private long maxWaitTime; + + private TimeUnit maxWaitTimeUnit; + + private long startRefreshInterval; + + private long stopRefreshInterval; + + @Override + public IndexDefinition setIndex(String index) { + this.index = index; + return this; + } + + @Override + public String getIndex() { + return index; + } + + @Override + public IndexDefinition setFullIndexName(String fullIndexName) { + this.fullIndexName = fullIndexName; + return this; + } + + @Override + public String getFullIndexName() { + return fullIndexName; + } + + @Override + public IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException { + this.settingsUrl = settingsUrlString != null ? new URL(settingsUrlString) : null; + return this; + } + + @Override + public IndexDefinition setSettingsUrl(URL settingsUrl) { + this.settingsUrl = settingsUrl; + return this; + } + + @Override + public URL getSettingsUrl() { + return settingsUrl; + } + + @Override + public IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException { + this.mappingsUrl = mappingsUrlString != null ? new URL(mappingsUrlString) : null; + return this; + } + + @Override + public IndexDefinition setMappingsUrl(URL mappingsUrl) { + this.mappingsUrl = mappingsUrl; + return this; + } + + @Override + public URL getMappingsUrl() { + return mappingsUrl; + } + + @Override + public IndexDefinition setDateTimePattern(String timeWindow) { + this.dateTimePattern = timeWindow; + return this; + } + + @Override + public String getDateTimePattern() { + return dateTimePattern; + } + + @Override + public IndexDefinition setEnabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public IndexDefinition setIgnoreErrors(boolean ignoreErrors) { + this.ignoreErrors = ignoreErrors; + return this; + } + + @Override + public boolean ignoreErrors() { + return ignoreErrors; + } + + @Override + public IndexDefinition setShift(boolean switchAliases) { + this.switchAliases = switchAliases; + return this; + } + + @Override + public boolean isShiftEnabled() { + return switchAliases; + } + + @Override + public IndexDefinition setForceMerge(boolean hasForceMerge) { + this.hasForceMerge = hasForceMerge; + return this; + } + + @Override + public boolean hasForceMerge() { + return hasForceMerge; + } + + @Override + public IndexDefinition setReplicaLevel(int replicaLevel) { + this.replicaLevel = replicaLevel; + return this; + } + + @Override + public int getReplicaLevel() { + return replicaLevel; + } + + @Override + public IndexDefinition setRetention(IndexRetention indexRetention) { + this.indexRetention = indexRetention; + return this; + } + + @Override + public IndexRetention getRetention() { + return indexRetention; + } + + @Override + public IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit) { + this.maxWaitTime = maxWaitTime; + this.maxWaitTimeUnit = timeUnit; + return this; + } + + @Override + public long getMaxWaitTime() { + return maxWaitTime; + } + + @Override + public TimeUnit getMaxWaitTimeUnit() { + return maxWaitTimeUnit; + } + + @Override + public IndexDefinition setStartRefreshInterval(long seconds) { + this.startRefreshInterval = seconds; + return this; + } + + @Override + public long getStartRefreshInterval() { + return startRefreshInterval; + } + + @Override + public IndexDefinition setStopRefreshInterval(long seconds) { + this.stopRefreshInterval = seconds; + return this; + } + + @Override + public long getStopRefreshInterval() { + return stopRefreshInterval; + } + +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java new file mode 100644 index 0000000..4e49be3 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java @@ -0,0 +1,32 @@ +package org.xbib.elx.common; + +import org.xbib.elx.api.IndexRetention; + +public class DefaultIndexRetention implements IndexRetention { + + private int delta; + + private int minToKeep; + + @Override + public IndexRetention setDelta(int delta) { + this.delta = delta; + return this; + } + + @Override + public int getDelta() { + return delta; + } + + @Override + public IndexRetention setMinToKeep(int minToKeep) { + this.minToKeep = minToKeep; + return this; + } + + @Override + public int getMinToKeep() { + return minToKeep; + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java index 7e5a1e4..58e303d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockExtendedClient.java @@ -6,8 +6,10 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; +import java.util.concurrent.TimeUnit; + /** - * Mock client, it does not perform actions on a cluster. Useful for testing or dry runs. + * Mock client, it does not perform any actions on a cluster. Useful for testing. */ public class MockExtendedClient extends AbstractExtendedClient { @@ -56,18 +58,13 @@ public class MockExtendedClient extends AbstractExtendedClient { return this; } - @Override - public MockExtendedClient flushIngest() { - return this; - } - @Override public MockExtendedClient startBulk(String index, long startRefreshInterval, long stopRefreshIterval) { return this; } @Override - public MockExtendedClient stopBulk(String index, String maxWaitTime) { + public MockExtendedClient stopBulk(String index, long maxWaitTime, TimeUnit timeUnit) { return this; } @@ -92,32 +89,37 @@ public class MockExtendedClient extends AbstractExtendedClient { } @Override - public boolean forceMerge(String index, String maxWaitTime) { + public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) { return true; } @Override - public boolean waitForCluster(String healthColor, String timeValue) { + public boolean waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) { return true; } @Override - public boolean waitForResponses(String maxWaitTime) { + public boolean waitForResponses(long maxWaitTime, TimeUnit timeUnit) { return true; } @Override - public boolean waitForRecovery(String index, String maxWaitTime) { + public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) { return true; } @Override - public MockExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) { + public MockExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) { return this; } @Override - public void shutdown() { + public void flush() { + // nothing to do + } + + @Override + public void close() { // nothing to do } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java b/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java deleted file mode 100644 index d606ecc..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/SimpleBulkControl.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.xbib.elx.common; - -import org.xbib.elx.api.BulkControl; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - */ -public class SimpleBulkControl implements BulkControl { - - private final Set indexNames; - - private final Map startBulkRefreshIntervals; - - private final Map stopBulkRefreshIntervals; - - private String maxWaitTime; - - public SimpleBulkControl() { - indexNames = new HashSet<>(); - startBulkRefreshIntervals = new HashMap<>(); - stopBulkRefreshIntervals = new HashMap<>(); - maxWaitTime = "30s"; - } - - @Override - public void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval) { - indexNames.add(indexName); - startBulkRefreshIntervals.put(indexName, startRefreshInterval); - stopBulkRefreshIntervals.put(indexName, stopRefreshInterval); - } - - @Override - public boolean isBulk(String indexName) { - return indexNames.contains(indexName); - } - - @Override - public void finishBulk(String indexName) { - indexNames.remove(indexName); - } - - @Override - public Set indices() { - return indexNames; - } - - @Override - public Map getStartBulkRefreshIntervals() { - return startBulkRefreshIntervals; - } - - @Override - public Map getStopBulkRefreshIntervals() { - return stopBulkRefreshIntervals; - } - - @Override - public String getMaxWaitTime() { - return maxWaitTime; - } - -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/io/package-info.java b/elx-common/src/main/java/org/xbib/elx/common/io/package-info.java index 62a8d78..5e86ba1 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/io/package-info.java +++ b/elx-common/src/main/java/org/xbib/elx/common/io/package-info.java @@ -1 +1,4 @@ -package org.xbib.elx.common.io; \ No newline at end of file +/** + * I/O helpers for Elasticsearch client extensions. + */ +package org.xbib.elx.common.io; diff --git a/elx-common/src/main/java/org/xbib/elx/common/util/package-info.java b/elx-common/src/main/java/org/xbib/elx/common/util/package-info.java index 3c41bfe..cd393c9 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/util/package-info.java +++ b/elx-common/src/main/java/org/xbib/elx/common/util/package-info.java @@ -1 +1,4 @@ -package org.xbib.elx.common.util; \ No newline at end of file +/** + * Utilities for Elasticsearch client extensions. + */ +package org.xbib.elx.common.util; diff --git a/elx-node/src/main/java/org/xbib/elx/node/ExtendedNodeClient.java b/elx-node/src/main/java/org/xbib/elx/node/ExtendedNodeClient.java index 493a596..7eca86e 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/ExtendedNodeClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/ExtendedNodeClient.java @@ -47,10 +47,9 @@ public class ExtendedNodeClient extends AbstractExtendedClient { return null; } - @Override - public void shutdown() throws IOException { - super.shutdown(); + public void close() throws IOException { + super.close(); try { if (node != null) { logger.debug("closing node..."); diff --git a/elx-node/src/main/java/org/xbib/elx/node/package-info.java b/elx-node/src/main/java/org/xbib/elx/node/package-info.java new file mode 100644 index 0000000..c2a9dfb --- /dev/null +++ b/elx-node/src/main/java/org/xbib/elx/node/package-info.java @@ -0,0 +1,4 @@ +/** + * Node client extensions. + */ +package org.xbib.elx.node; diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/ClientTest.java similarity index 83% rename from elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientTest.java rename to elx-node/src/test/java/org/xbib/elx/node/ClientTest.java index de28b4b..5aaae33 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ClientTest.java @@ -2,7 +2,7 @@ package org.xbib.elx.node; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import org.apache.logging.log4j.LogManager; @@ -27,9 +27,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class ExtendedNodeClientTest extends NodeTestUtils { +public class ClientTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedNodeClientTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(ClientTest.class.getSimpleName()); private static final Long ACTIONS = 25000L; @@ -55,17 +55,17 @@ public class ExtendedNodeClientTest extends NodeTestUtils { try { client.newIndex("test"); client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { assertEquals(1, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); - client.shutdown(); + assertNull(client.getBulkController().getLastBulkError()); + client.close(); } } @@ -76,11 +76,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) .build(); client.newIndex("test"); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); - client.shutdown(); + client.close(); } @Test @@ -105,11 +101,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils { client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); logger.info("mappings={}", getMappingsResponse.getMappings()); assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc")); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); - client.shutdown(); + client.close(); } @Test @@ -125,22 +117,22 @@ public class ExtendedNodeClientTest extends NodeTestUtils { for (int i = 0; i < ACTIONS; i++) { client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); + assertNull(client.getBulkController().getLastBulkError()); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE) .setQuery(QueryBuilders.matchAllQuery()).setSize(0); assertEquals(numactions, searchRequestBuilder.execute().actionGet().getHits().getTotalHits()); - client.shutdown(); + client.close(); } } @@ -172,9 +164,9 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } logger.info("waiting for latch..."); if (latch.await(5, TimeUnit.MINUTES)) { - logger.info("last flush..."); - client.flushIngest(); - client.waitForResponses("60s"); + logger.info("flush..."); + client.flush(); + client.waitForResponses(60L, TimeUnit.SECONDS); logger.info("got all responses, pool shutdown..."); pool.shutdown(); logger.info("pool is shut down"); @@ -184,18 +176,18 @@ public class ExtendedNodeClientTest extends NodeTestUtils { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.stopBulk("test", "30s"); + client.stopBulk("test", 30L, TimeUnit.SECONDS); assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); + assertNull(client.getBulkController().getLastBulkError()); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE) .setQuery(QueryBuilders.matchAllQuery()).setSize(0); assertEquals(maxthreads * actions, searchRequestBuilder.execute().actionGet().getHits().getTotalHits()); - client.shutdown(); + client.close(); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClusterBlockTest.java b/elx-node/src/test/java/org/xbib/elx/node/ClusterBlockTest.java similarity index 73% rename from elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClusterBlockTest.java rename to elx-node/src/test/java/org/xbib/elx/node/ClusterBlockTest.java index b38555a..23cbbe3 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeClusterBlockTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ClusterBlockTest.java @@ -4,17 +4,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -@Ignore -public class ExtendedNodeClusterBlockTest extends NodeTestUtils { +public class ClusterBlockTest extends NodeTestUtils { private static final Logger logger = LogManager.getLogger("test"); @@ -23,7 +22,6 @@ public class ExtendedNodeClusterBlockTest extends NodeTestUtils { try { setClusterName(); startNode("1"); - findNodeAddress(); // do not wait for green health state logger.info("ready"); } catch (Throwable t) { @@ -41,11 +39,11 @@ public class ExtendedNodeClusterBlockTest extends NodeTestUtils { @Test(expected = ClusterBlockException.class) public void testClusterBlock() throws Exception { - BulkRequestBuilder brb = client("1").prepareBulk(); - XContentBuilder builder = jsonBuilder().startObject().field("field1", "value1").endObject(); - IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1").setSource(builder); + Client client = client("1"); + XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("field1", "value1").endObject(); + IndexRequestBuilder irb = client.prepareIndex("test", "test", "1").setSource(builder); + BulkRequestBuilder brb = client.prepareBulk(); brb.add(irb); brb.execute().actionGet(); } - } diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendeNodeDuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/DuplicateIDTest.java similarity index 78% rename from elx-node/src/test/java/org/xbib/elx/node/ExtendeNodeDuplicateIDTest.java rename to elx-node/src/test/java/org/xbib/elx/node/DuplicateIDTest.java index ad75a95..d2126e5 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendeNodeDuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/DuplicateIDTest.java @@ -10,13 +10,14 @@ import org.junit.Test; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import java.util.concurrent.TimeUnit; + import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.junit.Assert.*; -@Ignore -public class ExtendeNodeDuplicateIDTest extends NodeTestUtils { +public class DuplicateIDTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendeNodeDuplicateIDTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getSimpleName()); private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; @@ -34,8 +35,8 @@ public class ExtendeNodeDuplicateIDTest extends NodeTestUtils { for (int i = 0; i < ACTIONS; i++) { client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE) .setIndices("test") @@ -47,12 +48,12 @@ public class ExtendeNodeDuplicateIDTest extends NodeTestUtils { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.shutdown(); + client.close(); assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); + assertNull(client.getBulkController().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java b/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java deleted file mode 100644 index 01e1df4..0000000 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeUpdateReplicaLevelTest.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.xbib.elx.node; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.settings.Settings; -import org.junit.Ignore; -import org.junit.Test; -import org.xbib.elx.common.ClientBuilder; - -import java.util.HashMap; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -@Ignore -public class ExtendedNodeUpdateReplicaLevelTest extends NodeTestUtils { - - private static final Logger logger = LogManager.getLogger(ExtendedNodeUpdateReplicaLevelTest.class.getSimpleName()); - - @Test - public void testUpdateReplicaLevel() throws Exception { - - long numberOfShards = 2; - int replicaLevel = 3; - - // we need 3 nodes for replica level 3 - startNode("2"); - startNode("3"); - - long shardsAfterReplica; - - Settings settings = Settings.settingsBuilder() - .put("index.number_of_shards", numberOfShards) - .put("index.number_of_replicas", 0) - .build(); - - final ExtendedNodeClient client = ClientBuilder.builder(client("1")) - .provider(ExtendedNodeClientProvider.class) - .build(); - - try { - client.newIndex("replicatest", settings, new HashMap<>()); - client.waitForCluster("GREEN", "30s"); - for (int i = 0; i < 12345; i++) { - client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); - } - client.flushIngest(); - client.waitForResponses("30s"); - client.updateReplicaLevel("replicatest", replicaLevel, "30s"); - //assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } finally { - client.shutdown(); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); - } - } - -} diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeIndexAliasTest.java b/elx-node/src/test/java/org/xbib/elx/node/IndexShiftTest.java similarity index 50% rename from elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeIndexAliasTest.java rename to elx-node/src/test/java/org/xbib/elx/node/IndexShiftTest.java index 222b261..9fb687a 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeIndexAliasTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/IndexShiftTest.java @@ -11,16 +11,18 @@ import org.xbib.elx.common.ClientBuilder; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; @Ignore -public class ExtendedNodeIndexAliasTest extends NodeTestUtils { +public class IndexShiftTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedNodeIndexAliasTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getSimpleName()); @Test - public void testIndexAlias() throws Exception { + public void testIndexShift() throws Exception { final ExtendedNodeClient client = ClientBuilder.builder(client("1")) .provider(ExtendedNodeClientProvider.class) .build(); @@ -29,37 +31,47 @@ public class ExtendedNodeIndexAliasTest extends NodeTestUtils { for (int i = 0; i < 1; i++) { client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); + client.flush(); client.refreshIndex("test1234"); List simpleAliases = Arrays.asList("a", "b", "c"); - client.switchIndex("test", "test1234", simpleAliases); + client.shiftIndex("test", "test1234", simpleAliases); client.newIndex("test5678"); for (int i = 0; i < 1; i++) { client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); + client.flush(); client.refreshIndex("test5678"); simpleAliases = Arrays.asList("d", "e", "f"); - client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) -> + client.shiftIndex("test", "test5678", simpleAliases, (builder, index, alias) -> builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias))); - Map aliases = client.getIndexFilters("test5678"); - logger.info("aliases of index test5678 = {}", aliases); + Map indexFilters = client.getIndexFilters("test5678"); + logger.info("aliases of index test5678 = {}", indexFilters); + assertTrue(indexFilters.containsKey("a")); + assertTrue(indexFilters.containsKey("b")); + assertTrue(indexFilters.containsKey("c")); + assertTrue(indexFilters.containsKey("d")); + assertTrue(indexFilters.containsKey("e")); - aliases = client.getAliasFilters("test"); + Map aliases = client.getIndexFilters(client.resolveAlias("test")); logger.info("aliases of alias test = {}", aliases); + assertTrue(aliases.containsKey("a")); + assertTrue(aliases.containsKey("b")); + assertTrue(aliases.containsKey("c")); + assertTrue(aliases.containsKey("d")); + assertTrue(aliases.containsKey("e")); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.waitForResponses("30s"); - client.shutdown(); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + client.waitForResponses(30L, TimeUnit.SECONDS); + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); + assertNull(client.getBulkController().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java b/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java index 05bf386..7faed8d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java +++ b/elx-node/src/test/java/org/xbib/elx/node/NodeTestUtils.java @@ -63,9 +63,7 @@ public class NodeTestUtils { Files.delete(dir); return FileVisitResult.CONTINUE; } - }); - } @Before @@ -74,7 +72,6 @@ public class NodeTestUtils { logger.info("starting"); setClusterName(); startNode("1"); - findNodeAddress(); try { ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) @@ -160,18 +157,6 @@ public class NodeTestUtils { logger.info("all nodes closed"); } - protected void findNodeAddress() { - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); - Object obj = response.iterator().next().getTransport().getAddress() - .publishAddress(); - if (obj instanceof InetSocketTransportAddress) { - InetSocketTransportAddress address = (InetSocketTransportAddress) obj; - String host = address.address().getHostName(); - int port = address.address().getPort(); - } - } - private Node buildNode(String id) { Settings nodeSettings = settingsBuilder() .put(getNodeSettings()) diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeReplicaTest.java b/elx-node/src/test/java/org/xbib/elx/node/ReplicaTest.java similarity index 66% rename from elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeReplicaTest.java rename to elx-node/src/test/java/org/xbib/elx/node/ReplicaTest.java index ed53cf5..762800f 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeReplicaTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/ReplicaTest.java @@ -19,15 +19,16 @@ import org.xbib.elx.common.ClientBuilder; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; @Ignore -public class ExtendedNodeReplicaTest extends NodeTestUtils { +public class ReplicaTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedNodeReplicaTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getSimpleName()); @Test public void testReplicaLevel() throws Exception { @@ -54,15 +55,15 @@ public class ExtendedNodeReplicaTest extends NodeTestUtils { try { client.newIndex("test1", settingsTest1, new HashMap<>()) .newIndex("test2", settingsTest2, new HashMap<>()); - client.waitForCluster("GREEN", "30s"); + client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS); for (int i = 0; i < 1234; i++) { client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } for (int i = 0; i < 1234; i++) { client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { @@ -97,11 +98,51 @@ public class ExtendedNodeReplicaTest extends NodeTestUtils { } catch (Exception e) { logger.error("delete index failed, ignored. Reason:", e); } - client.shutdown(); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); + assertNull(client.getBulkController().getLastBulkError()); + } + } + + @Test + public void testUpdateReplicaLevel() throws Exception { + + long numberOfShards = 2; + int replicaLevel = 3; + + // we need 3 nodes for replica level 3 + startNode("2"); + startNode("3"); + + Settings settings = Settings.settingsBuilder() + .put("index.number_of_shards", numberOfShards) + .put("index.number_of_replicas", 0) + .build(); + + final ExtendedNodeClient client = ClientBuilder.builder(client("1")) + .provider(ExtendedNodeClientProvider.class) + .build(); + + try { + client.newIndex("replicatest", settings, new HashMap<>()); + client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS); + for (int i = 0; i < 12345; i++) { + client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + } + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); + client.updateReplicaLevel("replicatest", replicaLevel, 30L, TimeUnit.SECONDS); + assertEquals(replicaLevel, client.getReplicaLevel("replicatest")); + } catch (NoNodeAvailableException e) { + logger.warn("skipping, no node available"); + } finally { + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); + } + assertNull(client.getBulkController().getLastBulkError()); } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeSmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/SmokeTest.java similarity index 70% rename from elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeSmokeTest.java rename to elx-node/src/test/java/org/xbib/elx/node/SmokeTest.java index adac4c6..cb70fe0 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/ExtendedNodeSmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/SmokeTest.java @@ -6,16 +6,16 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.settings.Settings; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; -import org.xbib.elx.common.SimpleBulkControl; -import org.xbib.elx.common.SimpleBulkMetric; import org.xbib.elx.api.IndexDefinition; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; -public class ExtendedNodeSmokeTest extends NodeTestUtils { +public class SmokeTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedNodeSmokeTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(SmokeTest.class.getSimpleName()); @Test public void smokeTest() throws Exception { @@ -23,21 +23,19 @@ public class ExtendedNodeSmokeTest extends NodeTestUtils { .provider(ExtendedNodeClientProvider.class) .build(); try { - client.setBulkControl(new SimpleBulkControl()); - client.setBulkMetric(new SimpleBulkMetric()); client.newIndex("test"); client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30, TimeUnit.SECONDS); assertEquals(clusterName, client.getClusterName()); client.checkMapping("test"); client.update("test", "1", "{ \"name\" : \"Another name\"}"); - client.flushIngest(); + client.flush(); - client.waitForRecovery("test", "10s"); + client.waitForRecovery("test", 10L, TimeUnit.SECONDS); client.delete("test", "1"); client.deleteIndex("test"); @@ -47,7 +45,7 @@ public class ExtendedNodeSmokeTest extends NodeTestUtils { assertEquals(0, indexDefinition.getReplicaLevel()); client.newIndex(indexDefinition); client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); - client.flushIngest(); + client.flush(); client.updateReplicaLevel(indexDefinition, 2); int replica = client.getReplicaLevel(indexDefinition); @@ -59,11 +57,11 @@ public class ExtendedNodeSmokeTest extends NodeTestUtils { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); - client.shutdown(); + assertNull(client.getBulkController().getLastBulkError()); } } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java index e883091..685b9ec 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/ExtendedTransportClient.java @@ -74,15 +74,15 @@ public class ExtendedTransportClient extends AbstractExtendedClient { } @Override - public synchronized void shutdown() throws IOException { - super.shutdown(); - logger.info("shutting down..."); + public synchronized void close() throws IOException { + super.close(); + logger.info("closing"); if (getClient() != null) { TransportClient client = (TransportClient) getClient(); client.close(); client.threadPool().shutdown(); } - logger.info("shutting down completed"); + logger.info("close completed"); } private Collection findAddresses(Settings settings) throws IOException { diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClient.java index 827f657..1a607ad 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClient.java @@ -308,7 +308,9 @@ public class TransportClient extends AbstractClient { transportService.connectToNode(node); } catch (Exception e) { it.remove(); - logger.debug("failed to connect to discovered node [" + node + "]", e); + if (logger.isDebugEnabled()) { + logger.debug("failed to connect to discovered node [" + node + "]", e); + } } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ClientTest.java similarity index 83% rename from elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientTest.java rename to elx-transport/src/test/java/org/xbib/elx/transport/ClientTest.java index 5085df6..56da530 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ClientTest.java @@ -25,12 +25,12 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class ExtendedTransportClientTest extends NodeTestUtils { +public class ClientTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedTransportClientTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(ClientTest.class.getSimpleName()); private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; @@ -54,10 +54,6 @@ public class ExtendedTransportClientTest extends NodeTestUtils { .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) .build(); client.newIndex("test"); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); try { client.deleteIndex("test") .newIndex("test") @@ -65,11 +61,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { } catch (NoNodeAvailableException e) { logger.error("no node available"); } finally { - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); - client.shutdown(); + client.close(); } } @@ -84,17 +76,17 @@ public class ExtendedTransportClientTest extends NodeTestUtils { try { client.newIndex("test"); client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { assertEquals(1, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); - client.shutdown(); + assertNull(client.getBulkController().getLastBulkError()); + client.close(); } } @@ -121,11 +113,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); logger.info("mappings={}", getMappingsResponse.getMappings()); assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc")); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); - client.shutdown(); + client.close(); } @Test @@ -142,17 +130,17 @@ public class ExtendedTransportClientTest extends NodeTestUtils { for (int i = 0; i < ACTIONS; i++) { client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); - client.shutdown(); + assertNull(client.getBulkController().getLastBulkError()); + client.close(); } } @@ -190,21 +178,21 @@ public class ExtendedTransportClientTest extends NodeTestUtils { logger.info("waiting for latch..."); if (latch.await(60, TimeUnit.SECONDS)) { logger.info("flush ..."); - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); logger.info("pool to be shut down ..."); pool.shutdown(); logger.info("poot shut down"); } - client.stopBulk("test", "30s"); + client.stopBulk("test", 30L, TimeUnit.SECONDS); assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount()); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); + assertNull(client.getBulkController().getLastBulkError()); // extra search lookup client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE) @@ -214,7 +202,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils { .setSize(0); assertEquals(maxthreads * maxloop, searchRequestBuilder.execute().actionGet().getHits().getTotalHits()); - client.shutdown(); + client.close(); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportDuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/DuplicateIDTest.java similarity index 74% rename from elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportDuplicateIDTest.java rename to elx-transport/src/test/java/org/xbib/elx/transport/DuplicateIDTest.java index ea1cd1d..6f6b6bd 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportDuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/DuplicateIDTest.java @@ -9,12 +9,16 @@ import org.junit.Test; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import java.util.concurrent.TimeUnit; + import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; -public class ExtendedTransportDuplicateIDTest extends NodeTestUtils { +public class DuplicateIDTest extends NodeTestUtils { - private final static Logger logger = LogManager.getLogger(ExtendedTransportDuplicateIDTest.class.getSimpleName()); + private final static Logger logger = LogManager.getLogger(DuplicateIDTest.class.getSimpleName()); private final static Long MAX_ACTIONS_PER_REQUEST = 1000L; @@ -33,8 +37,8 @@ public class ExtendedTransportDuplicateIDTest extends NodeTestUtils { for (int i = 0; i < ACTIONS; i++) { client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE) .setIndices("test") @@ -46,12 +50,12 @@ public class ExtendedTransportDuplicateIDTest extends NodeTestUtils { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.shutdown(); + client.close(); assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); + assertNull(client.getBulkController().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java deleted file mode 100644 index 1992021..0000000 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportUpdateReplicaLevelTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package org.xbib.elx.transport; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.settings.Settings; -import org.junit.Test; -import org.xbib.elx.common.ClientBuilder; - -import java.util.HashMap; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -public class ExtendedTransportUpdateReplicaLevelTest extends NodeTestUtils { - - private static final Logger logger = LogManager.getLogger(ExtendedTransportUpdateReplicaLevelTest.class.getSimpleName()); - - @Test - public void testUpdateReplicaLevel() throws Exception { - - long numberOfShards = 2; - int replicaLevel = 3; - - // we need 3 nodes for replica level 3 - startNode("2"); - startNode("3"); - - int shardsAfterReplica; - - final ExtendedTransportClient client = ClientBuilder.builder() - .provider(ExtendedTransportClientProvider.class) - .put(getSettings()) - .build(); - - Settings settings = Settings.settingsBuilder() - .put("index.number_of_shards", numberOfShards) - .put("index.number_of_replicas", 0) - .build(); - - try { - client.newIndex("replicatest", settings, new HashMap<>()); - client.waitForCluster("GREEN", "30s"); - for (int i = 0; i < 12345; i++) { - client.index("replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); - } - client.flushIngest(); - client.waitForResponses("30s"); - client.updateReplicaLevel("replicatest", replicaLevel, "30s"); - //assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } finally { - client.shutdown(); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); - } - assertFalse(client.hasThrowable()); - } - } -} diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportIndexAliasTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/IndexShiftTest.java similarity index 72% rename from elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportIndexAliasTest.java rename to elx-transport/src/test/java/org/xbib/elx/transport/IndexShiftTest.java index 8d27231..7c1fdff 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportIndexAliasTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/IndexShiftTest.java @@ -10,13 +10,14 @@ import org.xbib.elx.common.ClientBuilder; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class ExtendedTransportIndexAliasTest extends NodeTestUtils { +public class IndexShiftTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedTransportIndexAliasTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getSimpleName()); @Test public void testIndexAlias() throws Exception { @@ -28,21 +29,21 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils { for (int i = 0; i < 1; i++) { client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); + client.flush(); client.refreshIndex("test1234"); List simpleAliases = Arrays.asList("a", "b", "c"); - client.switchIndex("test", "test1234", simpleAliases); + client.shiftIndex("test", "test1234", simpleAliases); client.newIndex("test5678"); for (int i = 0; i < 1; i++) { client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); + client.flush(); client.refreshIndex("test5678"); simpleAliases = Arrays.asList("d", "e", "f"); - client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) -> + client.shiftIndex("test", "test5678", simpleAliases, (builder, index, alias) -> builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias))); Map indexFilters = client.getIndexFilters("test5678"); logger.info("index filters of index test5678 = {}", indexFilters); @@ -52,7 +53,7 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils { assertTrue(indexFilters.containsKey("d")); assertTrue(indexFilters.containsKey("e")); - Map aliases = client.getAliasFilters("test"); + Map aliases = client.getIndexFilters(client.resolveAlias("test")); logger.info("aliases of alias test = {}", aliases); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); @@ -60,15 +61,15 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); - client.waitForResponses("30s"); - assertFalse(client.hasThrowable()); + client.waitForResponses(30L, TimeUnit.SECONDS); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - client.shutdown(); + assertNull(client.getBulkController().getLastBulkError()); + client.close(); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportReplicaTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/ReplicaTest.java similarity index 65% rename from elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportReplicaTest.java rename to elx-transport/src/test/java/org/xbib/elx/transport/ReplicaTest.java index be95352..027b034 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportReplicaTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/ReplicaTest.java @@ -18,14 +18,15 @@ import org.xbib.elx.common.ClientBuilder; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; -public class ExtendedTransportReplicaTest extends NodeTestUtils { +public class ReplicaTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedTransportReplicaTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getSimpleName()); @Test public void testReplicaLevel() throws Exception { @@ -53,15 +54,15 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils { try { client.newIndex("test1", settingsTest1, new HashMap<>()) .newIndex("test2", settingsTest2, new HashMap<>()); - client.waitForCluster("GREEN", "30s"); + client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS); for (int i = 0; i < 1234; i++) { client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } for (int i = 0; i < 1234; i++) { client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); } - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); client.refreshIndex("test1"); client.refreshIndex("test2"); } catch (NoNodeAvailableException e) { @@ -96,12 +97,54 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils { } catch (Exception e) { logger.error("delete index failed, ignored. Reason:", e); } - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); - client.shutdown(); + assertNull(client.getBulkController().getLastBulkError()); + client.close(); } } + @Test + public void testUpdateReplicaLevel() throws Exception { + + long numberOfShards = 2; + int replicaLevel = 3; + + // we need 3 nodes for replica level 3 + startNode("2"); + startNode("3"); + + int shardsAfterReplica; + + final ExtendedTransportClient client = ClientBuilder.builder() + .provider(ExtendedTransportClientProvider.class) + .put(getSettings()) + .build(); + + Settings settings = Settings.settingsBuilder() + .put("index.number_of_shards", numberOfShards) + .put("index.number_of_replicas", 0) + .build(); + + try { + client.newIndex("replicatest", settings, new HashMap<>()); + client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS); + for (int i = 0; i < 12345; i++) { + client.index("replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}"); + } + client.flush(); + client.waitForResponses(30L, TimeUnit.SECONDS); + client.updateReplicaLevel("replicatest", replicaLevel, 30L, TimeUnit.SECONDS); + assertEquals(replicaLevel, client.getReplicaLevel("replicatest")); + } catch (NoNodeAvailableException e) { + logger.warn("skipping, no node available"); + } finally { + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); + } + assertNull(client.getBulkController().getLastBulkError()); + } + } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientSingleNodeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/SmokeTest.java similarity index 62% rename from elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientSingleNodeTest.java rename to elx-transport/src/test/java/org/xbib/elx/transport/SmokeTest.java index 3422762..d745015 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/ExtendedTransportClientSingleNodeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/SmokeTest.java @@ -6,12 +6,14 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.junit.Test; import org.xbib.elx.common.ClientBuilder; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; -public class ExtendedTransportClientSingleNodeTest extends NodeTestUtils { +public class SmokeTest extends NodeTestUtils { - private static final Logger logger = LogManager.getLogger(ExtendedTransportClientSingleNodeTest.class.getSimpleName()); + private static final Logger logger = LogManager.getLogger(SmokeTest.class.getSimpleName()); @Test public void testSingleDocNodeClient() throws Exception { @@ -22,17 +24,17 @@ public class ExtendedTransportClientSingleNodeTest extends NodeTestUtils { try { client.newIndex("test"); client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - client.flushIngest(); - client.waitForResponses("30s"); + client.flush(); + client.waitForResponses(30, TimeUnit.SECONDS); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { assertEquals(1, client.getBulkMetric().getSucceeded().getCount()); - if (client.hasThrowable()) { - logger.error("error", client.getThrowable()); + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); } - assertFalse(client.hasThrowable()); - client.shutdown(); + assertNull(client.getBulkController().getLastBulkError()); } } } diff --git a/gradle.properties b/gradle.properties index c961b52..98d19b7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.4 +version = 2.2.1.5 xbib-metrics.version = 1.1.0 xbib-guice.version = 4.0.4