refactored bulk controller, improved api, renamed und cleaned methods, result classes, removed redundant code and methods

2.2.1.5
Jörg Prante 5 years ago
parent 4701447d3c
commit 5184b75b36

@ -1,21 +0,0 @@
package org.xbib.elx.api;
import java.util.Map;
import java.util.Set;
public interface BulkControl {
void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval);
boolean isBulk(String indexName);
void finishBulk(String indexName);
Set<String> indices();
Map<String, Long> getStartBulkRefreshIntervals();
Map<String, Long> getStopBulkRefreshIntervals();
String getMaxWaitTime();
}

@ -0,0 +1,36 @@
package org.xbib.elx.api;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public interface BulkController extends Closeable, Flushable {
void init(Settings settings);
Throwable getLastBulkError();
void startBulkMode(IndexDefinition indexDefinition) throws IOException;
void startBulkMode(String indexName, long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException;
void index(IndexRequest indexRequest);
void delete(DeleteRequest deleteRequest);
void update(UpdateRequest updateRequest);
boolean waitForResponses(long timeout, TimeUnit timeUnit);
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;
}

@ -1,9 +1,14 @@
package org.xbib.elx.api;
import org.elasticsearch.common.settings.Settings;
import org.xbib.metrics.Count;
import org.xbib.metrics.Metered;
public interface BulkMetric {
import java.io.Closeable;
public interface BulkMetric extends Closeable {
void init(Settings settings);
Metered getTotalIngest();
@ -19,9 +24,9 @@ public interface BulkMetric {
Count getFailed();
long elapsed();
void start();
void stop();
long elapsed();
}

@ -0,0 +1,64 @@
package org.xbib.elx.api;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import java.io.Closeable;
import java.io.Flushable;
import java.util.concurrent.TimeUnit;
public interface BulkProcessor extends Closeable, Flushable {
BulkProcessor add(ActionRequest<?> request);
BulkProcessor add(ActionRequest<?> request, Object payload);
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
}
/**
* A listener for the execution.
*/
public interface Listener {
/**
* Callback before the bulk is executed.
*
* @param executionId execution ID
* @param request request
*/
void beforeBulk(long executionId, BulkRequest request);
/**
* Callback after a successful execution of bulk request.
*
* @param executionId execution ID
* @param request request
* @param response response
*/
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
/**
* Callback after a failed execution of bulk request.
*
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*
* @param executionId execution ID
* @param request request
* @param failure failure
*/
void afterBulk(long executionId, BulkRequest request, Throwable failure);
}
}

@ -6,15 +6,19 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Interface for extended managing and indexing methods of an Elasticsearch client.
*/
public interface ExtendedClient {
public interface ExtendedClient extends Flushable, Closeable {
/**
* Set an Elasticsearch client to extend from it. May be null for TransportClient.
@ -30,22 +34,6 @@ public interface ExtendedClient {
*/
ElasticsearchClient getClient();
/**
* Initiative the extended client, cerates instances and connect to cluster, if required.
*
* @param settings settings
* @return this client
* @throws IOException if init fails
*/
ExtendedClient init(Settings settings) throws IOException;
/**
* Set bulk metric.
* @param bulkMetric the bulk metric
* @return this client
*/
ExtendedClient setBulkMetric(BulkMetric bulkMetric);
/**
* Get bulk metric.
* @return the bulk metric
@ -53,17 +41,20 @@ public interface ExtendedClient {
BulkMetric getBulkMetric();
/**
* Set bulk control.
* @param bulkControl the bulk control
* @return this
* Get buulk control.
* @return the bulk control
*/
ExtendedClient setBulkControl(BulkControl bulkControl);
BulkController getBulkController();
/**
* Get buulk control.
* @return the bulk control
* Initiative the extended client, the bulk metric and bulk controller,
* creates instances and connect to cluster, if required.
*
* @param settings settings
* @return this client
* @throws IOException if init fails
*/
BulkControl getBulkControl();
ExtendedClient init(Settings settings) throws IOException;
/**
* Build index definition from settings.
@ -256,18 +247,12 @@ public interface ExtendedClient {
* Stops bulk mode.
*
* @param index index
* @param maxWaitTime maximum wait time
* @param timeout maximum wait time
* @param timeUnit time unit for timeout
* @return this
* @throws IOException if bulk could not be stopped
*/
ExtendedClient stopBulk(String index, String maxWaitTime) throws IOException;
/**
* Flush bulk indexing, move all pending documents to the cluster.
*
* @return this
*/
ExtendedClient flushIngest();
ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException;
/**
* Update replica level.
@ -284,10 +269,11 @@ public interface ExtendedClient {
* @param index index
* @param level the replica level
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return this
* @throws IOException if replica setting could not be updated
*/
ExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) throws IOException;
ExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException;
/**
* Get replica level.
@ -330,43 +316,57 @@ public interface ExtendedClient {
* Force segment merge of an index.
* @param index the index
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return this
*/
boolean forceMerge(String index, String maxWaitTime);
boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit);
/**
* Wait for all outstanding bulk responses.
*
* @param maxWaitTime maximum wait time
* @param timeout maximum wait time
* @param timeUnit unit of timeout value
* @return true if wait succeeded, false if wait timed out
*/
boolean waitForResponses(String maxWaitTime);
boolean waitForResponses(long timeout, TimeUnit timeUnit);
/**
* Wait for cluster being healthy.
*
* @param healthColor cluster health color to wait for
* @param maxWaitTime time value
* @param timeUnit time unit
* @return true if wait succeeded, false if wait timed out
*/
boolean waitForCluster(String healthColor, String maxWaitTime);
boolean waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit);
/**
* Get current health color.
*
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return the cluster health color
*/
String getHealthColor(String maxWaitTime);
String getHealthColor(long maxWaitTime, TimeUnit timeUnit);
/**
* Wait for index recovery (after replica change).
*
* @param index index
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return true if wait succeeded, false if wait timed out
*/
boolean waitForRecovery(String index, String maxWaitTime);
boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit);
/**
* Update index setting.
* @param index the index
* @param key the key of the value to be updated
* @param value the new value
* @throws IOException if update index setting failed
*/
void updateIndexSetting(String index, String key, Object value) throws IOException;
/**
* Resolve alias.
@ -385,14 +385,6 @@ public interface ExtendedClient {
*/
String resolveMostRecentIndex(String alias);
/**
* Get all alias filters.
*
* @param alias the alias
* @return map of alias filters
*/
Map<String, String> getAliasFilters(String alias);
/**
* Get all index filters.
* @param index the index
@ -401,48 +393,49 @@ public interface ExtendedClient {
Map<String, String> getIndexFilters(String index);
/**
* Switch from one index to another.
* Shift from one index to another.
* @param indexDefinition the index definition
* @param extraAliases new aliases
* @param additionalAliases new aliases
* @return this
*/
ExtendedClient switchIndex(IndexDefinition indexDefinition, List<String> extraAliases);
IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases);
/**
* Switch from one index to another.
* Shift from one index to another.
* @param indexDefinition the index definition
* @param extraAliases new aliases
* @param additionalAliases new aliases
* @param indexAliasAdder method to add aliases
* @return this
*/
ExtendedClient switchIndex(IndexDefinition indexDefinition, List<String> extraAliases, IndexAliasAdder indexAliasAdder);
IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases,
IndexAliasAdder indexAliasAdder);
/**
* Switch from one index to another.
*
* Shift from one index to another.
* @param index the index name
* @param fullIndexName the index name with timestamp
* @param extraAliases a list of names that should be set as index aliases
* @param additionalAliases a list of names that should be set as index aliases
* @return this
*/
ExtendedClient switchIndex(String index, String fullIndexName, List<String> extraAliases);
IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases);
/**
* Switch from one index to another.
*
* Shift from one index to another.
* @param index the index name
* @param fullIndexName the index name with timestamp
* @param extraAliases a list of names that should be set as index aliases
* @param additionalAliases a list of names that should be set as index aliases
* @param adder an adder method to create alias term queries
* @return this
*/
ExtendedClient switchIndex(String index, String fullIndexName, List<String> extraAliases, IndexAliasAdder adder);
IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases,
IndexAliasAdder adder);
/**
* Prune index.
* @param indexDefinition the index definition
* @return the index prune result
*/
void pruneIndex(IndexDefinition indexDefinition);
IndexPruneResult pruneIndex(IndexDefinition indexDefinition);
/**
* Apply retention policy to prune indices. All indices before delta should be deleted,
@ -452,8 +445,10 @@ public interface ExtendedClient {
* @param fullIndexName index name with timestamp
* @param delta timestamp delta (for index timestamps)
* @param mintokeep minimum number of indices to keep
* @param perform true if pruning should be executed, false if not
* @return the index prune result
*/
void pruneIndex(String index, String fullIndexName, int delta, int mintokeep);
IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform);
/**
* Find the timestamp of the most recently indexed document in the index.
@ -470,24 +465,4 @@ public interface ExtendedClient {
* @return the cluster name
*/
String getClusterName();
/**
* Returns true is a throwable exists.
*
* @return true if a Throwable exists
*/
boolean hasThrowable();
/**
* Return last throwable if exists.
*
* @return last throwable
*/
Throwable getThrowable();
/**
* Shutdown the client.
* @throws IOException if shutdown fails
*/
void shutdown() throws IOException;
}

@ -2,148 +2,69 @@ package org.xbib.elx.api;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.TimeUnit;
public class IndexDefinition {
public interface IndexDefinition {
private String index;
IndexDefinition setIndex(String index);
private String fullIndexName;
String getIndex();
private String dateTimePattern;
IndexDefinition setFullIndexName(String fullIndexName);
private URL settingsUrl;
String getFullIndexName();
private URL mappingsUrl;
IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException;
private boolean enabled;
IndexDefinition setSettingsUrl(URL settingsUrl);
private boolean ignoreErrors;
URL getSettingsUrl();
private boolean switchAliases;
IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException;
private boolean hasForceMerge;
IndexDefinition setMappingsUrl(URL mappingsUrl);
private int replicaLevel;
URL getMappingsUrl();
private IndexRetention indexRetention;
IndexDefinition setDateTimePattern(String timeWindow);
private String maxWaitTime;
String getDateTimePattern();
public IndexDefinition setIndex(String index) {
this.index = index;
return this;
}
IndexDefinition setEnabled(boolean enabled);
public String getIndex() {
return index;
}
boolean isEnabled();
public IndexDefinition setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName;
return this;
}
IndexDefinition setIgnoreErrors(boolean ignoreErrors);
public String getFullIndexName() {
return fullIndexName;
}
boolean ignoreErrors();
public IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException {
this.settingsUrl = settingsUrlString != null ? new URL(settingsUrlString) : null;
return this;
}
IndexDefinition setShift(boolean shift);
public IndexDefinition setSettingsUrl(URL settingsUrl) {
this.settingsUrl = settingsUrl;
return this;
}
boolean isShiftEnabled();
public URL getSettingsUrl() {
return settingsUrl;
}
IndexDefinition setForceMerge(boolean hasForceMerge);
public IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException {
this.mappingsUrl = mappingsUrlString != null ? new URL(mappingsUrlString) : null;
return this;
}
boolean hasForceMerge();
public IndexDefinition setMappingsUrl(URL mappingsUrl) {
this.mappingsUrl = mappingsUrl;
return this;
}
IndexDefinition setReplicaLevel(int replicaLevel);
public URL getMappingsUrl() {
return mappingsUrl;
}
int getReplicaLevel();
public IndexDefinition setDateTimePattern(String timeWindow) {
this.dateTimePattern = timeWindow;
return this;
}
IndexDefinition setRetention(IndexRetention indexRetention);
public String getDateTimePattern() {
return dateTimePattern;
}
IndexRetention getRetention();
public IndexDefinition setEnabled(boolean enabled) {
this.enabled = enabled;
return this;
}
IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit);
public boolean isEnabled() {
return enabled;
}
long getMaxWaitTime();
public IndexDefinition setIgnoreErrors(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
return this;
}
TimeUnit getMaxWaitTimeUnit();
public boolean ignoreErrors() {
return ignoreErrors;
}
IndexDefinition setStartRefreshInterval(long seconds);
public IndexDefinition setSwitchAliases(boolean switchAliases) {
this.switchAliases = switchAliases;
return this;
}
long getStartRefreshInterval();
public boolean isSwitchAliases() {
return switchAliases;
}
IndexDefinition setStopRefreshInterval(long seconds);
public IndexDefinition setForceMerge(boolean hasForceMerge) {
this.hasForceMerge = hasForceMerge;
return this;
}
public boolean hasForceMerge() {
return hasForceMerge;
}
public IndexDefinition setReplicaLevel(int replicaLevel) {
this.replicaLevel = replicaLevel;
return this;
}
public int getReplicaLevel() {
return replicaLevel;
}
public IndexDefinition setRetention(IndexRetention indexRetention) {
this.indexRetention = indexRetention;
return this;
}
public IndexRetention getRetention() {
return indexRetention;
}
public IndexDefinition setMaxWaitTime(String maxWaitTime) {
this.maxWaitTime = maxWaitTime;
return this;
}
public String getMaxWaitTime() {
return maxWaitTime;
}
long getStopRefreshInterval();
}

@ -0,0 +1,16 @@
package org.xbib.elx.api;
import java.util.List;
public interface IndexPruneResult {
enum State { NOTHING_TO_DO, SUCCESS, NONE };
State getState();
List<String> getCandidateIndices();
List<String> getDeletedIndices();
boolean isAcknowledged();
}

@ -1,27 +1,13 @@
package org.xbib.elx.api;
public class IndexRetention {
public interface IndexRetention {
private int timestampDiff;
IndexRetention setDelta(int delta);
private int minToKeep;
int getDelta();
public IndexRetention setDelta(int timestampDiff) {
this.timestampDiff = timestampDiff;
return this;
}
IndexRetention setMinToKeep(int minToKeep);
public int getDelta() {
return timestampDiff;
}
public IndexRetention setMinToKeep(int minToKeep) {
this.minToKeep = minToKeep;
return this;
}
public int getMinToKeep() {
return minToKeep;
}
int getMinToKeep();
}

@ -0,0 +1,10 @@
package org.xbib.elx.api;
import java.util.List;
public interface IndexShiftResult {
List<String> getMovedAliases();
List<String> getNewAliases();
}

@ -4,7 +4,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@ -44,16 +43,12 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -62,7 +57,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
@ -71,12 +65,14 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.xbib.elx.api.BulkControl;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.IndexAliasAdder;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexRetention;
import org.xbib.elx.api.IndexShiftResult;
import java.io.IOException;
import java.io.InputStream;
@ -99,6 +95,7 @@ import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Matcher;
@ -120,27 +117,58 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
*/
private ElasticsearchClient client;
/**
* Our replacement for the buk processor.
*/
private BulkProcessor bulkProcessor;
private BulkMetric bulkMetric;
private BulkControl bulkControl;
private BulkController bulkController;
private Throwable throwable;
private AtomicBoolean closed;
private boolean closed;
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
@Override
public List<String> getMovedAliases() {
return Collections.emptyList();
}
@Override
public List<String> getNewAliases() {
return Collections.emptyList();
}
};
private static final IndexPruneResult EMPTY_INDEX_PRUNE_RESULT = new IndexPruneResult() {
@Override
public State getState() {
return State.NONE;
}
@Override
public List<String> getCandidateIndices() {
return Collections.emptyList();
}
@Override
public List<String> getDeletedIndices() {
return Collections.emptyList();
}
@Override
public boolean isAcknowledged() {
return false;
}
};
protected abstract ElasticsearchClient createClient(Settings settings) throws IOException;
protected AbstractExtendedClient() {
closed = new AtomicBoolean(false);
}
@Override
public AbstractExtendedClient setClient(ElasticsearchClient client) {
this.client = client;
this.bulkMetric = new DefaultBulkMetric();
bulkMetric.start();
this.bulkController = new DefaultBulkController(this, bulkMetric);
return this;
}
@ -149,28 +177,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return client;
}
@Override
public AbstractExtendedClient setBulkMetric(BulkMetric metric) {
this.bulkMetric = metric;
// you must start bulk metric or it will bail out at stop()
bulkMetric.start();
return this;
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public AbstractExtendedClient setBulkControl(BulkControl bulkControl) {
this.bulkControl = bulkControl;
return this;
}
@Override
public BulkControl getBulkControl() {
return bulkControl;
public BulkController getBulkController() {
return bulkController;
}
@Override
@ -181,120 +195,33 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (bulkMetric != null) {
bulkMetric.start();
}
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
private final Logger logger = LogManager.getLogger("org.xbib.elx.BulkProcessor.Listener");
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().inc();
int n = request.numberOfActions();
bulkMetric.getSubmitted().inc(n);
bulkMetric.getCurrentIngestNumDocs().inc(n);
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
}
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
executionId,
request.numberOfActions(),
request.estimatedSizeInBytes(),
l);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
}
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
}
if (itemResponse.isFailed()) {
n++;
if (bulkMetric != null) {
bulkMetric.getSucceeded().dec(1);
bulkMetric.getFailed().inc(1);
}
}
}
if (bulkMetric != null) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
executionId,
bulkMetric.getSucceeded().getCount(),
bulkMetric.getFailed().getCount(),
response.getTook().millis(),
l);
}
if (n > 0) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
} else {
if (bulkMetric != null) {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec();
}
throwable = failure;
closed = true;
logger.error("after bulk [" + executionId + "] error", failure);
}
};
if (client != null) {
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(),
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum());
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(),
Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum());
TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(),
TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum()));
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
"maxVolumePerRequest"));
logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {}",
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest);
BulkProcessor.Builder builder = BulkProcessor.builder((Client) client, listener)
.setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests)
.setFlushInterval(flushIngestInterval)
.setBulkSize(maxVolumePerRequest);
this.bulkProcessor = builder.build();
}
this.closed = false;
this.throwable = null;
if (bulkController != null) {
bulkController.init(settings);
}
return this;
}
@Override
public synchronized void shutdown() throws IOException {
ensureActive();
if (bulkProcessor != null) {
logger.info("closing bulk processor");
bulkProcessor.close();
}
if (bulkMetric != null) {
logger.info("stopping metric before bulk stop (for precise measurement)");
bulkMetric.stop();
public void flush() throws IOException {
if (bulkController != null) {
bulkController.flush();
}
if (bulkControl != null && bulkControl.indices() != null && !bulkControl.indices().isEmpty()) {
logger.info("stopping bulk mode for indices {}...", bulkControl.indices());
for (String index : bulkControl.indices()) {
stopBulk(index, bulkControl.getMaxWaitTime());
}
@Override
public void close() throws IOException {
ensureActive();
if (closed.compareAndSet(false, true)) {
if (bulkMetric != null) {
logger.info("closing bulk metric before bulk controller (for precise measurement)");
bulkMetric.close();
}
if (bulkController != null) {
logger.info("closing bulk controller");
bulkController.close();
}
logger.info("shutdown complete");
}
logger.info("shutdown complete");
}
@Override
@ -320,7 +247,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException {
ensureActive();
waitForCluster("YELLOW", "30s");
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
URL indexSettings = indexDefinition.getSettingsUrl();
if (indexSettings == null) {
logger.warn("warning while creating index '{}', no settings/mappings",
@ -417,37 +344,27 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
throws IOException {
ensureActive();
if (bulkControl == null) {
return this;
}
if (!bulkControl.isBulk(index) && startRefreshIntervalSeconds > 0L && stopRefreshIntervalSeconds > 0L) {
bulkControl.startBulk(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
updateIndexSetting(index, "refresh_interval", startRefreshIntervalSeconds + "s");
if (bulkController != null) {
ensureActive();
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
}
return this;
}
@Override
public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException {
return stopBulk(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime());
if (bulkController != null) {
ensureActive();
bulkController.stopBulkMode(indexDefinition);
}
return this;
}
@Override
public ExtendedClient stopBulk(String index, String maxWaitTime) throws IOException {
ensureActive();
if (bulkControl == null) {
return this;
}
flushIngest();
if (waitForResponses(maxWaitTime)) {
if (bulkControl.isBulk(index)) {
long secs = bulkControl.getStopBulkRefreshIntervals().get(index);
if (secs > 0L) {
updateIndexSetting(index, "refresh_interval", secs + "s");
}
bulkControl.finishBulk(index);
}
public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException {
if (bulkController != null) {
ensureActive();
bulkController.stopBulkMode(index, timeout, timeUnit);
}
return this;
}
@ -465,16 +382,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient index(IndexRequest indexRequest) {
ensureActive();
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
}
bulkProcessor.add(indexRequest);
} catch (Exception e) {
throwable = e;
closed = true;
logger.error("bulk add of index request failed: " + e.getMessage(), e);
}
bulkController.index(indexRequest);
return this;
}
@ -486,16 +394,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient delete(DeleteRequest deleteRequest) {
ensureActive();
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
}
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
throwable = e;
closed = true;
logger.error("bulk add of delete failed: " + e.getMessage(), e);
}
bulkController.delete(deleteRequest);
return this;
}
@ -512,49 +411,23 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient update(UpdateRequest updateRequest) {
ensureActive();
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
}
bulkProcessor.add(updateRequest);
} catch (Exception e) {
throwable = e;
closed = true;
logger.error("bulk add of update request failed: " + e.getMessage(), e);
}
bulkController.update(updateRequest);
return this;
}
@Override
public ExtendedClient flushIngest() {
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
ensureActive();
logger.debug("flushing bulk processor");
bulkProcessor.flush();
return this;
return bulkController.waitForResponses(timeout, timeUnit);
}
@Override
public boolean waitForResponses(String maxWaitTime) {
ensureActive();
long millis = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueMinutes(1),"millis").getMillis();
logger.debug("waiting for " + millis + " millis");
try {
return bulkProcessor.awaitFlush(millis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("interrupted");
return false;
}
}
@Override
public boolean waitForRecovery(String index, String maxWaitTime) {
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureIndexGiven(index);
RecoveryResponse response = client.execute(RecoveryAction.INSTANCE, new RecoveryRequest(index)).actionGet();
int shards = response.getTotalShards();
TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10),
getClass().getSimpleName() + ".timeout");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest(index)
.waitForActiveShards(shards).timeout(timeout)).actionGet();
@ -566,26 +439,26 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
public boolean waitForCluster(String statusString, String maxWaitTime) {
public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10),
getClass().getSimpleName() + ".timeout");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
if (logger.isErrorEnabled()) {
logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
}
return false;
}
return true;
}
@Override
public String getHealthColor(String maxWaitTime) {
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
try {
TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10),
getClass().getSimpleName() + ".timeout");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().timeout(timeout)).actionGet();
ClusterHealthStatus status = healthResponse.getStatus();
@ -604,15 +477,16 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
return updateReplicaLevel(indexDefinition.getFullIndexName(), level, indexDefinition.getMaxWaitTime());
return updateReplicaLevel(indexDefinition.getFullIndexName(), level,
indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
}
@Override
public ExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) throws IOException {
waitForCluster("YELLOW", maxWaitTime); // let cluster settle down from critical operations
public ExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException {
waitForCluster("YELLOW", maxWaitTime, timeUnit); // let cluster settle down from critical operations
if (level > 0) {
updateIndexSetting(index, "number_of_replicas", level);
waitForRecovery(index, maxWaitTime);
waitForRecovery(index, maxWaitTime, timeUnit);
}
return this;
}
@ -684,12 +558,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return indices.isEmpty() ? alias : indices.iterator().next();
}
@Override
public Map<String, String> getAliasFilters(String alias) {
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE);
return getFilters(getAliasesRequestBuilder.setIndices(resolveAlias(alias)).execute().actionGet());
}
@Override
public Map<String, String> getIndexFilters(String index) {
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE);
@ -697,50 +565,49 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
public ExtendedClient switchIndex(IndexDefinition indexDefinition, List<String> extraAliases) {
return switchIndex(indexDefinition, extraAliases, null);
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases) {
return shiftIndex(indexDefinition, additionalAliases, null);
}
@Override
public ExtendedClient switchIndex(IndexDefinition indexDefinition,
List<String> extraAliases, IndexAliasAdder indexAliasAdder) {
if (extraAliases == null) {
return this;
}
if (indexDefinition.isSwitchAliases()) {
switchIndex(indexDefinition.getIndex(),
indexDefinition.getFullIndexName(), extraAliases.stream()
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
List<String> additionalAliases, IndexAliasAdder indexAliasAdder) {
if (additionalAliases == null) {
return EMPTY_INDEX_SHIFT_RESULT;
}
if (indexDefinition.isShiftEnabled()) {
return shiftIndex(indexDefinition.getIndex(),
indexDefinition.getFullIndexName(), additionalAliases.stream()
.filter(a -> a != null && !a.isEmpty())
.collect(Collectors.toList()), indexAliasAdder);
}
return this;
return EMPTY_INDEX_SHIFT_RESULT;
}
@Override
public ExtendedClient switchIndex(String index, String fullIndexName, List<String> extraAliases) {
return switchIndex(index, fullIndexName, extraAliases, null);
public IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases) {
return shiftIndex(index, fullIndexName, additionalAliases, null);
}
@Override
public ExtendedClient switchIndex(String index, String fullIndexName,
List<String> extraAliases, IndexAliasAdder adder) {
public IndexShiftResult shiftIndex(String index, String fullIndexName,
List<String> additionalAliases, IndexAliasAdder adder) {
ensureActive();
if (index.equals(fullIndexName)) {
return this; // nothing to switch to
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
}
// two situations: 1. there is a new alias 2. there is already an old index with the alias
String oldIndex = resolveAlias(index);
final Map<String, String> oldFilterMap = oldIndex.equals(index) ? null : getIndexFilters(oldIndex);
final List<String> newAliases = new LinkedList<>();
final List<String> switchAliases = new LinkedList<>();
final List<String> moveAliases = new LinkedList<>();
IndicesAliasesRequestBuilder requestBuilder = new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE);
if (oldFilterMap == null || !oldFilterMap.containsKey(index)) {
// never apply a filter for trunk index name
requestBuilder.addAlias(fullIndexName, index);
newAliases.add(index);
}
// switch existing aliases
// move existing aliases
if (oldFilterMap != null) {
for (Map.Entry<String, String> entry : oldFilterMap.entrySet()) {
String alias = entry.getKey();
@ -751,12 +618,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
} else {
requestBuilder.addAlias(fullIndexName, alias);
}
switchAliases.add(alias);
moveAliases.add(alias);
}
}
// a list of aliases that should be added, check if new or old
if (extraAliases != null) {
for (String extraAlias : extraAliases) {
if (additionalAliases != null) {
for (String extraAlias : additionalAliases) {
if (oldFilterMap == null || !oldFilterMap.containsKey(extraAlias)) {
// index alias adder only active on extra aliases, and if alias is new
if (adder != null) {
@ -773,82 +640,72 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
} else {
requestBuilder.addAlias(fullIndexName, extraAlias);
}
switchAliases.add(extraAlias);
moveAliases.add(extraAlias);
}
}
}
if (!newAliases.isEmpty() || !switchAliases.isEmpty()) {
logger.info("new aliases = {}, switch aliases = {}", newAliases, switchAliases);
if (!newAliases.isEmpty() || !moveAliases.isEmpty()) {
logger.info("new aliases = {}, moved aliases = {}", newAliases, moveAliases);
requestBuilder.execute().actionGet();
}
return this;
return new SuccessIndexShiftResult(moveAliases, newAliases);
}
@Override
public void pruneIndex(IndexDefinition indexDefinition) {
pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(),
indexDefinition.getRetention().getDelta(), indexDefinition.getRetention().getMinToKeep());
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
return pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(),
indexDefinition.getRetention().getDelta(), indexDefinition.getRetention().getMinToKeep(), true);
}
@Override
public void pruneIndex(String index, String fullIndexName, int delta, int mintokeep) {
public IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform) {
if (delta == 0 && mintokeep == 0) {
return;
return EMPTY_INDEX_PRUNE_RESULT;
}
ensureActive();
if (index.equals(fullIndexName)) {
return;
return EMPTY_INDEX_PRUNE_RESULT;
}
ensureActive();
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
Set<String> indices = new TreeSet<>();
logger.info("{} indices", getIndexResponse.getIndices().length);
List<String> candidateIndices = new ArrayList<>();
for (String s : getIndexResponse.getIndices()) {
Matcher m = pattern.matcher(s);
if (m.matches() && index.equals(m.group(1)) && !s.equals(fullIndexName)) {
indices.add(s);
candidateIndices.add(s);
}
}
if (indices.isEmpty()) {
logger.info("no indices found, retention policy skipped");
return;
if (candidateIndices.isEmpty()) {
return EMPTY_INDEX_PRUNE_RESULT;
}
if (mintokeep > 0 && indices.size() <= mintokeep) {
logger.info("{} indices found, not enough for retention policy ({}), skipped",
indices.size(), mintokeep);
return;
} else {
logger.info("candidates for deletion = {}", indices);
if (mintokeep > 0 && candidateIndices.size() <= mintokeep) {
return new NothingToDoPruneResult(candidateIndices, Collections.emptyList());
}
List<String> indicesToDelete = new ArrayList<>();
// our index
Matcher m1 = pattern.matcher(fullIndexName);
if (m1.matches()) {
Integer i1 = Integer.parseInt(m1.group(2));
for (String s : indices) {
for (String s : candidateIndices) {
Matcher m2 = pattern.matcher(s);
if (m2.matches()) {
Integer i2 = Integer.parseInt(m2.group(2));
int kept = indices.size() - indicesToDelete.size();
int kept = candidateIndices.size() - indicesToDelete.size();
if ((delta == 0 || (delta > 0 && i1 - i2 > delta)) && mintokeep <= kept) {
indicesToDelete.add(s);
}
}
}
}
logger.info("indices to delete = {}", indicesToDelete);
if (indicesToDelete.isEmpty()) {
logger.info("not enough indices found to delete, retention policy complete");
return;
return new NothingToDoPruneResult(candidateIndices, indicesToDelete);
}
String[] s = new String[indicesToDelete.size()];
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest()
.indices(indicesToDelete.toArray(s));
DeleteIndexResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
if (!response.isAcknowledged()) {
logger.warn("retention delete index operation was not acknowledged");
}
return new SuccessPruneResult(candidateIndices, indicesToDelete, response);
}
@Override
@ -875,15 +732,15 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public boolean forceMerge(IndexDefinition indexDefinition) {
if (indexDefinition.hasForceMerge()) {
return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime());
return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime(),
indexDefinition.getMaxWaitTimeUnit());
}
return false;
}
@Override
public boolean forceMerge(String index, String maxWaitTime) {
TimeValue timeout = TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(10),
getClass().getSimpleName() + ".timeout");
public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) {
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ForceMergeRequestBuilder forceMergeRequestBuilder =
new ForceMergeRequestBuilder(client, ForceMergeAction.INSTANCE);
forceMergeRequestBuilder.setIndices(index);
@ -909,44 +766,35 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
String fullIndexName;
String dateTimePattern = settings.get("dateTimePattern");
if (dateTimePattern != null) {
fullIndexName = resolveAlias(indexName +
DateTimeFormatter.ofPattern(dateTimePattern)
// check if index name with current date already exists, resolve to it
fullIndexName = resolveAlias(indexName + DateTimeFormatter.ofPattern(dateTimePattern)
.withZone(ZoneId.systemDefault()) // not GMT
.format(LocalDate.now()));
logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName);
} else {
// check if index name already exists, resolve to it
fullIndexName = resolveMostRecentIndex(indexName);
logger.info("index name {} resolved to full index name = {}", indexName, fullIndexName);
}
IndexRetention indexRetention = new IndexRetention()
IndexRetention indexRetention = new DefaultIndexRetention()
.setMinToKeep(settings.getAsInt("retention.mintokeep", 0))
.setDelta(settings.getAsInt("retention.delta", 0));
return new IndexDefinition()
return new DefaultIndexDefinition()
.setEnabled(isEnabled)
.setIndex(indexName)
.setFullIndexName(fullIndexName)
.setSettingsUrl(settings.get("settings"))
.setMappingsUrl(settings.get("mapping"))
.setDateTimePattern(dateTimePattern)
.setEnabled(isEnabled)
.setIgnoreErrors(settings.getAsBoolean("skiperrors", false))
.setSwitchAliases(settings.getAsBoolean("aliases", true))
.setShift(settings.getAsBoolean("shift", true))
.setReplicaLevel(settings.getAsInt("replica", 0))
.setMaxWaitTime(settings.get("timout", "30s"))
.setRetention(indexRetention);
.setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS)
.setRetention(indexRetention)
.setStartRefreshInterval(settings.getAsLong("bulk.startrefreshinterval", -1L))
.setStopRefreshInterval(settings.getAsLong("bulk.stoprefreshinterval", -1L));
}
@Override
public boolean hasThrowable() {
return throwable != null;
}
@Override
public Throwable getThrowable() {
return throwable;
}
private void updateIndexSetting(String index, String key, Object value) throws IOException {
public void updateIndexSetting(String index, String key, Object value) throws IOException {
ensureActive();
if (index == null) {
throw new IOException("no index name given");
@ -971,9 +819,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (client == null) {
throw new IllegalStateException("no client");
}
if (closed) {
throw new ElasticsearchException("client is closed");
}
}
private void ensureIndexGiven(String index) {
@ -1096,4 +941,115 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
.forEachOrdered(e -> result.put(e.getKey(), e.getValue()));
return result;
}
private static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) {
switch (timeUnit) {
case DAYS:
return TimeValue.timeValueHours(24 * timeValue);
case HOURS:
return TimeValue.timeValueHours(timeValue);
case MINUTES:
return TimeValue.timeValueMinutes(timeValue);
case SECONDS:
return TimeValue.timeValueSeconds(timeValue);
case MILLISECONDS:
return TimeValue.timeValueMillis(timeValue);
case MICROSECONDS:
return TimeValue.timeValueNanos(1000 * timeValue);
case NANOSECONDS:
return TimeValue.timeValueNanos(timeValue);
default:
throw new IllegalArgumentException("unknown time unit: " + timeUnit);
}
}
private static class SuccessIndexShiftResult implements IndexShiftResult {
List<String> movedAliases;
List<String> newAliases;
SuccessIndexShiftResult(List<String> movedAliases, List<String> newAliases) {
this.movedAliases = movedAliases;
this.newAliases = newAliases;
}
@Override
public List<String> getMovedAliases() {
return movedAliases;
}
@Override
public List<String> getNewAliases() {
return newAliases;
}
}
private static class SuccessPruneResult implements IndexPruneResult {
List<String> candidateIndices;
List<String> indicesToDelete;
DeleteIndexResponse response;
SuccessPruneResult(List<String> candidateIndices, List<String> indicesToDelete,
DeleteIndexResponse response) {
this.candidateIndices = candidateIndices;
this.indicesToDelete = indicesToDelete;
this.response = response;
}
@Override
public IndexPruneResult.State getState() {
return IndexPruneResult.State.SUCCESS;
}
@Override
public List<String> getCandidateIndices() {
return candidateIndices;
}
@Override
public List<String> getDeletedIndices() {
return indicesToDelete;
}
@Override
public boolean isAcknowledged() {
return response.isAcknowledged();
}
}
private static class NothingToDoPruneResult implements IndexPruneResult {
List<String> candidateIndices;
List<String> indicesToDelete;
NothingToDoPruneResult(List<String> candidateIndices, List<String> indicesToDelete) {
this.candidateIndices = candidateIndices;
this.indicesToDelete = indicesToDelete;
}
@Override
public IndexPruneResult.State getState() {
return IndexPruneResult.State.SUCCESS;
}
@Override
public List<String> getCandidateIndices() {
return candidateIndices;
}
@Override
public List<String> getDeletedIndices() {
return indicesToDelete;
}
@Override
public boolean isAcknowledged() {
return false;
}
}
}

@ -5,8 +5,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkControl;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.ExtendedClientProvider;
@ -26,10 +24,6 @@ public class ClientBuilder {
private Class<? extends ExtendedClientProvider> provider;
private BulkMetric metric;
private BulkControl control;
public ClientBuilder() {
this(null);
}
@ -48,8 +42,6 @@ public class ClientBuilder {
for (ExtendedClientProvider provider : serviceLoader) {
providerMap.put(provider.getClass(), provider);
}
this.metric = new SimpleBulkMetric();
this.control = new SimpleBulkControl();
}
public static ClientBuilder builder() {
@ -100,25 +92,11 @@ public class ClientBuilder {
return this;
}
public ClientBuilder setMetric(BulkMetric metric) {
this.metric = metric;
return this;
}
public ClientBuilder setControl(BulkControl control) {
this.control = control;
return this;
}
@SuppressWarnings("unchecked")
public <C extends ExtendedClient> C build() throws IOException {
if (provider == null) {
throw new IllegalArgumentException("no provider");
}
return (C) providerMap.get(provider).getExtendedClient()
.setClient(client)
.setBulkMetric(metric)
.setBulkControl(control)
.init(settingsBuilder.build());
return (C) providerMap.get(provider).getExtendedClient().setClient(client).init(settingsBuilder.build());
}
}

@ -0,0 +1,309 @@
package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.IndexDefinition;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class DefaultBulkController implements BulkController {
private static final Logger logger = LogManager.getLogger(DefaultBulkController.class);
private final ExtendedClient client;
private final BulkMetric bulkMetric;
private final List<String> indexNames;
private final Map<String, Long> startBulkRefreshIntervals;
private final Map<String, Long> stopBulkRefreshIntervals;
private long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private BulkProcessor bulkProcessor;
private BulkListener bulkListener;
private AtomicBoolean active;
public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) {
this.client = client;
this.bulkMetric = bulkMetric;
this.indexNames = new ArrayList<>();
this.active = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
this.stopBulkRefreshIntervals = new HashMap<>();
this.maxWaitTime = 30L;
this.maxWaitTimeUnit = TimeUnit.SECONDS;
}
@Override
public Throwable getLastBulkError() {
return bulkListener.getLastBulkError();
}
@Override
public void init(Settings settings) {
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(),
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum());
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(),
Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum());
TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(),
TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum()));
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
"maxVolumePerRequest"));
if (logger.isInfoEnabled()) {
logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {}",
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest);
}
this.bulkListener = new BulkListener();
DefaultBulkProcessor.Builder builder = DefaultBulkProcessor.builder((Client) client.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests)
.setFlushInterval(flushIngestInterval)
.setBulkSize(maxVolumePerRequest);
this.bulkProcessor = builder.build();
this.active.set(true);
}
@Override
public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(),
indexDefinition.getStopRefreshInterval());
}
@Override
public void startBulkMode(String indexName,
long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException {
if (!indexNames.contains(indexName)) {
indexNames.add(indexName);
startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds);
stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds);
if (startRefreshIntervalInSeconds != 0L) {
client.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s");
}
}
}
@Override
public void index(IndexRequest indexRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
}
bulkProcessor.add(indexRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of index failed: " + e.getMessage(), e);
}
}
}
@Override
public void delete(DeleteRequest deleteRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
}
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of delete failed: " + e.getMessage(), e);
}
}
}
@Override
public void update(UpdateRequest updateRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
}
bulkProcessor.add(updateRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of update failed: " + e.getMessage(), e);
}
}
}
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
try {
return bulkProcessor.awaitFlush(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("interrupted");
return false;
}
}
@Override
public void stopBulkMode(IndexDefinition indexDefinition) throws IOException {
stopBulkMode(indexDefinition.getFullIndexName(),
indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
}
@Override
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
flush();
if (waitForResponses(timeout, timeUnit)) {
if (indexNames.contains(index)) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L) {
client.updateIndexSetting(index, "refresh_interval", secs + "s");
}
indexNames.remove(index);
}
}
}
@Override
public void flush() throws IOException {
if (bulkProcessor != null) {
bulkProcessor.flush();
}
}
@Override
public void close() throws IOException {
flush();
if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L)
client.updateIndexSetting(index, "refresh_interval", secs + "s");
}
indexNames.clear();
}
if (bulkProcessor != null) {
bulkProcessor.close();
}
}
private class BulkListener implements DefaultBulkProcessor.Listener {
private final Logger logger = LogManager.getLogger("org.xbib.elx.BulkProcessor.Listener");
private Throwable lastBulkError = null;
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().inc();
int n = request.numberOfActions();
bulkMetric.getSubmitted().inc(n);
bulkMetric.getCurrentIngestNumDocs().inc(n);
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
}
if (logger.isDebugEnabled()) {
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
executionId,
request.numberOfActions(),
request.estimatedSizeInBytes(),
l);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
}
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
}
if (itemResponse.isFailed()) {
n++;
if (bulkMetric != null) {
bulkMetric.getSucceeded().dec(1);
bulkMetric.getFailed().inc(1);
}
}
}
if (bulkMetric != null && logger.isDebugEnabled()) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
executionId,
bulkMetric.getSucceeded().getCount(),
bulkMetric.getFailed().getCount(),
response.getTook().millis(),
l);
}
if (n > 0) {
if (logger.isErrorEnabled()) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
}
} else {
if (bulkMetric != null) {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec();
}
lastBulkError = failure;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("after bulk [" + executionId + "] error", failure);
}
}
Throwable getLastBulkError() {
return lastBulkError;
}
}
}

@ -1,5 +1,6 @@
package org.xbib.elx.common;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.BulkMetric;
import org.xbib.metrics.Count;
import org.xbib.metrics.CountMetric;
@ -7,9 +8,8 @@ import org.xbib.metrics.Meter;
import org.xbib.metrics.Metered;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class SimpleBulkMetric implements BulkMetric {
public class DefaultBulkMetric implements BulkMetric {
private final Meter totalIngest;
@ -29,12 +29,8 @@ public class SimpleBulkMetric implements BulkMetric {
private Long stopped;
public SimpleBulkMetric() {
this(Executors.newSingleThreadScheduledExecutor());
}
public SimpleBulkMetric(ScheduledExecutorService executorService) {
totalIngest = new Meter(executorService);
public DefaultBulkMetric() {
totalIngest = new Meter(Executors.newSingleThreadScheduledExecutor());
totalIngestSizeInBytes = new CountMetric();
currentIngest = new CountMetric();
currentIngestNumDocs = new CountMetric();
@ -43,6 +39,11 @@ public class SimpleBulkMetric implements BulkMetric {
failed = new CountMetric();
}
@Override
public void init(Settings settings) {
start();
}
@Override
public Metered getTotalIngest() {
return totalIngest;
@ -78,6 +79,11 @@ public class SimpleBulkMetric implements BulkMetric {
return failed;
}
@Override
public long elapsed() {
return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L;
}
@Override
public void start() {
this.started = System.nanoTime();
@ -91,8 +97,8 @@ public class SimpleBulkMetric implements BulkMetric {
}
@Override
public long elapsed() {
return (stopped != null ? stopped : System.nanoTime()) - started;
public void close() {
stop();
totalIngest.shutdown();
}
}

@ -5,16 +5,14 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.xbib.elx.api.BulkProcessor;
import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -28,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
* requests allowed to be executed in parallel.
* In order to create a new bulk processor, use the {@link Builder}.
*/
public class BulkProcessor implements Closeable {
public class DefaultBulkProcessor implements BulkProcessor {
private final int bulkActions;
@ -46,8 +44,8 @@ public class BulkProcessor implements Closeable {
private volatile boolean closed;
private BulkProcessor(Client client, Listener listener, String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
private DefaultBulkProcessor(Client client, Listener listener, String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
@ -77,19 +75,6 @@ public class BulkProcessor implements Closeable {
return new Builder(client, listener);
}
/**
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
*/
@Override
public void close() {
try {
// 0 = immediate close
awaitClose(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
}
}
/**
* Wait for bulk request handler with flush.
* @param timeout the timeout value
@ -97,7 +82,8 @@ public class BulkProcessor implements Closeable {
* @return true is method was successful, false if timeout
* @throws InterruptedException if timeout
*/
public boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException {
@Override
public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
}
@ -124,6 +110,7 @@ public class BulkProcessor implements Closeable {
* bulk requests completed
* @throws InterruptedException If the current thread is interrupted
*/
@Override
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
@ -140,46 +127,51 @@ public class BulkProcessor implements Closeable {
}
/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
* Adds either a delete or an index request.
*
* @param request request
* @return his bulk processor
*/
public BulkProcessor add(IndexRequest request) {
return add((ActionRequest) request);
@Override
public DefaultBulkProcessor add(ActionRequest<?> request) {
return add(request, null);
}
/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
* Adds either a delete or an index request with a payload.
*
* @param request request
* @param payload payload
* @return his bulk processor
*/
public BulkProcessor add(DeleteRequest request) {
return add((ActionRequest) request);
@Override
public DefaultBulkProcessor add(ActionRequest<?> request, Object payload) {
internalAdd(request, payload);
return this;
}
/**
* Adds either a delete or an index request.
*
* @param request request
* @return his bulk processor
* Flush pending delete or index requests.
*/
public BulkProcessor add(ActionRequest<?> request) {
return add(request, null);
@Override
public synchronized void flush() {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
}
/**
* Adds either a delete or an index request with a payload.
*
* @param request request
* @param payload payload
* @return his bulk processor
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
*/
public BulkProcessor add(ActionRequest<?> request, Object payload) {
internalAdd(request, payload);
return this;
@Override
public void close() {
try {
// 0 = immediate close
awaitClose(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
}
}
private void ensureOpen() {
@ -213,53 +205,7 @@ public class BulkProcessor implements Closeable {
return bulkActions != -1 &&
bulkRequest.numberOfActions() >= bulkActions ||
bulkSize != -1 &&
bulkRequest.estimatedSizeInBytes() >= bulkSize;
}
/**
* Flush pending delete or index requests.
*/
public synchronized void flush() {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
}
/**
* A listener for the execution.
*/
public interface Listener {
/**
* Callback before the bulk is executed.
*
* @param executionId execution ID
* @param request request
*/
void beforeBulk(long executionId, BulkRequest request);
/**
* Callback after a successful execution of bulk request.
*
* @param executionId execution ID
* @param request request
* @param response response
*/
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
/**
* Callback after a failed execution of bulk request.
*
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*
* @param executionId execution ID
* @param request request
* @param failure failure
*/
void afterBulk(long executionId, BulkRequest request, Throwable failure);
bulkRequest.estimatedSizeInBytes() >= bulkSize;
}
/**
@ -359,8 +305,8 @@ public class BulkProcessor implements Closeable {
*
* @return a bulk processor
*/
public BulkProcessor build() {
return new BulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
public DefaultBulkProcessor build() {
return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
}
}
@ -368,7 +314,7 @@ public class BulkProcessor implements Closeable {
@Override
public void run() {
synchronized (BulkProcessor.this) {
synchronized (DefaultBulkProcessor.this) {
if (closed) {
return;
}
@ -380,24 +326,13 @@ public class BulkProcessor implements Closeable {
}
}
/**
* Abstracts the low-level details of bulk request handling.
*/
interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
}
private static class SyncBulkRequestHandler implements BulkRequestHandler {
private final Client client;
private final BulkProcessor.Listener listener;
private final DefaultBulkProcessor.Listener listener;
SyncBulkRequestHandler(Client client, BulkProcessor.Listener listener) {
SyncBulkRequestHandler(Client client, DefaultBulkProcessor.Listener listener) {
this.client = client;
this.listener = listener;
}
@ -427,13 +362,13 @@ public class BulkProcessor implements Closeable {
private final Client client;
private final BulkProcessor.Listener listener;
private final DefaultBulkProcessor.Listener listener;
private final Semaphore semaphore;
private final int concurrentRequests;
private AsyncBulkRequestHandler(Client client, BulkProcessor.Listener listener, int concurrentRequests) {
private AsyncBulkRequestHandler(Client client, DefaultBulkProcessor.Listener listener, int concurrentRequests) {
this.client = client;
this.listener = listener;
this.concurrentRequests = concurrentRequests;

@ -0,0 +1,214 @@
package org.xbib.elx.common;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexRetention;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.TimeUnit;
public class DefaultIndexDefinition implements IndexDefinition {
private String index;
private String fullIndexName;
private String dateTimePattern;
private URL settingsUrl;
private URL mappingsUrl;
private boolean enabled;
private boolean ignoreErrors;
private boolean switchAliases;
private boolean hasForceMerge;
private int replicaLevel;
private IndexRetention indexRetention;
private long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private long startRefreshInterval;
private long stopRefreshInterval;
@Override
public IndexDefinition setIndex(String index) {
this.index = index;
return this;
}
@Override
public String getIndex() {
return index;
}
@Override
public IndexDefinition setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName;
return this;
}
@Override
public String getFullIndexName() {
return fullIndexName;
}
@Override
public IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException {
this.settingsUrl = settingsUrlString != null ? new URL(settingsUrlString) : null;
return this;
}
@Override
public IndexDefinition setSettingsUrl(URL settingsUrl) {
this.settingsUrl = settingsUrl;
return this;
}
@Override
public URL getSettingsUrl() {
return settingsUrl;
}
@Override
public IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException {
this.mappingsUrl = mappingsUrlString != null ? new URL(mappingsUrlString) : null;
return this;
}
@Override
public IndexDefinition setMappingsUrl(URL mappingsUrl) {
this.mappingsUrl = mappingsUrl;
return this;
}
@Override
public URL getMappingsUrl() {
return mappingsUrl;
}
@Override
public IndexDefinition setDateTimePattern(String timeWindow) {
this.dateTimePattern = timeWindow;
return this;
}
@Override
public String getDateTimePattern() {
return dateTimePattern;
}
@Override
public IndexDefinition setEnabled(boolean enabled) {
this.enabled = enabled;
return this;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public IndexDefinition setIgnoreErrors(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
return this;
}
@Override
public boolean ignoreErrors() {
return ignoreErrors;
}
@Override
public IndexDefinition setShift(boolean switchAliases) {
this.switchAliases = switchAliases;
return this;
}
@Override
public boolean isShiftEnabled() {
return switchAliases;
}
@Override
public IndexDefinition setForceMerge(boolean hasForceMerge) {
this.hasForceMerge = hasForceMerge;
return this;
}
@Override
public boolean hasForceMerge() {
return hasForceMerge;
}
@Override
public IndexDefinition setReplicaLevel(int replicaLevel) {
this.replicaLevel = replicaLevel;
return this;
}
@Override
public int getReplicaLevel() {
return replicaLevel;
}
@Override
public IndexDefinition setRetention(IndexRetention indexRetention) {
this.indexRetention = indexRetention;
return this;
}
@Override
public IndexRetention getRetention() {
return indexRetention;
}
@Override
public IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit) {
this.maxWaitTime = maxWaitTime;
this.maxWaitTimeUnit = timeUnit;
return this;
}
@Override
public long getMaxWaitTime() {
return maxWaitTime;
}
@Override
public TimeUnit getMaxWaitTimeUnit() {
return maxWaitTimeUnit;
}
@Override
public IndexDefinition setStartRefreshInterval(long seconds) {
this.startRefreshInterval = seconds;
return this;
}
@Override
public long getStartRefreshInterval() {
return startRefreshInterval;
}
@Override
public IndexDefinition setStopRefreshInterval(long seconds) {
this.stopRefreshInterval = seconds;
return this;
}
@Override
public long getStopRefreshInterval() {
return stopRefreshInterval;
}
}

@ -0,0 +1,32 @@
package org.xbib.elx.common;
import org.xbib.elx.api.IndexRetention;
public class DefaultIndexRetention implements IndexRetention {
private int delta;
private int minToKeep;
@Override
public IndexRetention setDelta(int delta) {
this.delta = delta;
return this;
}
@Override
public int getDelta() {
return delta;
}
@Override
public IndexRetention setMinToKeep(int minToKeep) {
this.minToKeep = minToKeep;
return this;
}
@Override
public int getMinToKeep() {
return minToKeep;
}
}

@ -6,8 +6,10 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.TimeUnit;
/**
* Mock client, it does not perform actions on a cluster. Useful for testing or dry runs.
* Mock client, it does not perform any actions on a cluster. Useful for testing.
*/
public class MockExtendedClient extends AbstractExtendedClient {
@ -56,18 +58,13 @@ public class MockExtendedClient extends AbstractExtendedClient {
return this;
}
@Override
public MockExtendedClient flushIngest() {
return this;
}
@Override
public MockExtendedClient startBulk(String index, long startRefreshInterval, long stopRefreshIterval) {
return this;
}
@Override
public MockExtendedClient stopBulk(String index, String maxWaitTime) {
public MockExtendedClient stopBulk(String index, long maxWaitTime, TimeUnit timeUnit) {
return this;
}
@ -92,32 +89,37 @@ public class MockExtendedClient extends AbstractExtendedClient {
}
@Override
public boolean forceMerge(String index, String maxWaitTime) {
public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) {
return true;
}
@Override
public boolean waitForCluster(String healthColor, String timeValue) {
public boolean waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) {
return true;
}
@Override
public boolean waitForResponses(String maxWaitTime) {
public boolean waitForResponses(long maxWaitTime, TimeUnit timeUnit) {
return true;
}
@Override
public boolean waitForRecovery(String index, String maxWaitTime) {
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
return true;
}
@Override
public MockExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) {
public MockExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) {
return this;
}
@Override
public void shutdown() {
public void flush() {
// nothing to do
}
@Override
public void close() {
// nothing to do
}
}

@ -1,66 +0,0 @@
package org.xbib.elx.common;
import org.xbib.elx.api.BulkControl;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
*/
public class SimpleBulkControl implements BulkControl {
private final Set<String> indexNames;
private final Map<String, Long> startBulkRefreshIntervals;
private final Map<String, Long> stopBulkRefreshIntervals;
private String maxWaitTime;
public SimpleBulkControl() {
indexNames = new HashSet<>();
startBulkRefreshIntervals = new HashMap<>();
stopBulkRefreshIntervals = new HashMap<>();
maxWaitTime = "30s";
}
@Override
public void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval) {
indexNames.add(indexName);
startBulkRefreshIntervals.put(indexName, startRefreshInterval);
stopBulkRefreshIntervals.put(indexName, stopRefreshInterval);
}
@Override
public boolean isBulk(String indexName) {
return indexNames.contains(indexName);
}
@Override
public void finishBulk(String indexName) {
indexNames.remove(indexName);
}
@Override
public Set<String> indices() {
return indexNames;
}
@Override
public Map<String, Long> getStartBulkRefreshIntervals() {
return startBulkRefreshIntervals;
}
@Override
public Map<String, Long> getStopBulkRefreshIntervals() {
return stopBulkRefreshIntervals;
}
@Override
public String getMaxWaitTime() {
return maxWaitTime;
}
}

@ -1 +1,4 @@
package org.xbib.elx.common.io;
/**
* I/O helpers for Elasticsearch client extensions.
*/
package org.xbib.elx.common.io;

@ -1 +1,4 @@
package org.xbib.elx.common.util;
/**
* Utilities for Elasticsearch client extensions.
*/
package org.xbib.elx.common.util;

@ -47,10 +47,9 @@ public class ExtendedNodeClient extends AbstractExtendedClient {
return null;
}
@Override
public void shutdown() throws IOException {
super.shutdown();
public void close() throws IOException {
super.close();
try {
if (node != null) {
logger.debug("closing node...");

@ -0,0 +1,4 @@
/**
* Node client extensions.
*/
package org.xbib.elx.node;

@ -2,7 +2,7 @@ package org.xbib.elx.node;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.logging.log4j.LogManager;
@ -27,9 +27,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExtendedNodeClientTest extends NodeTestUtils {
public class ClientTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeClientTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ClientTest.class.getSimpleName());
private static final Long ACTIONS = 25000L;
@ -55,17 +55,17 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
try {
client.newIndex("test");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
client.shutdown();
assertNull(client.getBulkController().getLastBulkError());
client.close();
}
}
@ -76,11 +76,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
client.newIndex("test");
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
client.close();
}
@Test
@ -105,11 +101,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc"));
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
client.close();
}
@Test
@ -125,22 +117,22 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
for (int i = 0; i < ACTIONS; i++) {
client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
assertNull(client.getBulkController().getLastBulkError());
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0);
assertEquals(numactions,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
client.shutdown();
client.close();
}
}
@ -172,9 +164,9 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
}
logger.info("waiting for latch...");
if (latch.await(5, TimeUnit.MINUTES)) {
logger.info("last flush...");
client.flushIngest();
client.waitForResponses("60s");
logger.info("flush...");
client.flush();
client.waitForResponses(60L, TimeUnit.SECONDS);
logger.info("got all responses, pool shutdown...");
pool.shutdown();
logger.info("pool is shut down");
@ -184,18 +176,18 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.stopBulk("test", "30s");
client.stopBulk("test", 30L, TimeUnit.SECONDS);
assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
assertNull(client.getBulkController().getLastBulkError());
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0);
assertEquals(maxthreads * actions,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
client.shutdown();
client.close();
}
}
}

@ -4,17 +4,16 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@Ignore
public class ExtendedNodeClusterBlockTest extends NodeTestUtils {
public class ClusterBlockTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger("test");
@ -23,7 +22,6 @@ public class ExtendedNodeClusterBlockTest extends NodeTestUtils {
try {
setClusterName();
startNode("1");
findNodeAddress();
// do not wait for green health state
logger.info("ready");
} catch (Throwable t) {
@ -41,11 +39,11 @@ public class ExtendedNodeClusterBlockTest extends NodeTestUtils {
@Test(expected = ClusterBlockException.class)
public void testClusterBlock() throws Exception {
BulkRequestBuilder brb = client("1").prepareBulk();
XContentBuilder builder = jsonBuilder().startObject().field("field1", "value1").endObject();
IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1").setSource(builder);
Client client = client("1");
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("field1", "value1").endObject();
IndexRequestBuilder irb = client.prepareIndex("test", "test", "1").setSource(builder);
BulkRequestBuilder brb = client.prepareBulk();
brb.add(irb);
brb.execute().actionGet();
}
}

@ -10,13 +10,14 @@ import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
@Ignore
public class ExtendeNodeDuplicateIDTest extends NodeTestUtils {
public class DuplicateIDTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendeNodeDuplicateIDTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getSimpleName());
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
@ -34,8 +35,8 @@ public class ExtendeNodeDuplicateIDTest extends NodeTestUtils {
for (int i = 0; i < ACTIONS; i++) {
client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test")
@ -47,12 +48,12 @@ public class ExtendeNodeDuplicateIDTest extends NodeTestUtils {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.shutdown();
client.close();
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -1,63 +0,0 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@Ignore
public class ExtendedNodeUpdateReplicaLevelTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeUpdateReplicaLevelTest.class.getSimpleName());
@Test
public void testUpdateReplicaLevel() throws Exception {
long numberOfShards = 2;
int replicaLevel = 3;
// we need 3 nodes for replica level 3
startNode("2");
startNode("3");
long shardsAfterReplica;
Settings settings = Settings.settingsBuilder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.build();
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.build();
try {
client.newIndex("replicatest", settings, new HashMap<>());
client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) {
client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.updateReplicaLevel("replicatest", replicaLevel, "30s");
//assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.shutdown();
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
}
}
}

@ -11,16 +11,18 @@ import org.xbib.elx.common.ClientBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Ignore
public class ExtendedNodeIndexAliasTest extends NodeTestUtils {
public class IndexShiftTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeIndexAliasTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getSimpleName());
@Test
public void testIndexAlias() throws Exception {
public void testIndexShift() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.build();
@ -29,37 +31,47 @@ public class ExtendedNodeIndexAliasTest extends NodeTestUtils {
for (int i = 0; i < 1; i++) {
client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.flush();
client.refreshIndex("test1234");
List<String> simpleAliases = Arrays.asList("a", "b", "c");
client.switchIndex("test", "test1234", simpleAliases);
client.shiftIndex("test", "test1234", simpleAliases);
client.newIndex("test5678");
for (int i = 0; i < 1; i++) {
client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.flush();
client.refreshIndex("test5678");
simpleAliases = Arrays.asList("d", "e", "f");
client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
client.shiftIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
Map<String, String> aliases = client.getIndexFilters("test5678");
logger.info("aliases of index test5678 = {}", aliases);
Map<String, String> indexFilters = client.getIndexFilters("test5678");
logger.info("aliases of index test5678 = {}", indexFilters);
assertTrue(indexFilters.containsKey("a"));
assertTrue(indexFilters.containsKey("b"));
assertTrue(indexFilters.containsKey("c"));
assertTrue(indexFilters.containsKey("d"));
assertTrue(indexFilters.containsKey("e"));
aliases = client.getAliasFilters("test");
Map<String, String> aliases = client.getIndexFilters(client.resolveAlias("test"));
logger.info("aliases of alias test = {}", aliases);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.waitForResponses("30s");
client.shutdown();
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
client.waitForResponses(30L, TimeUnit.SECONDS);
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -63,9 +63,7 @@ public class NodeTestUtils {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
@Before
@ -74,7 +72,6 @@ public class NodeTestUtils {
logger.info("starting");
setClusterName();
startNode("1");
findNodeAddress();
try {
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
@ -160,18 +157,6 @@ public class NodeTestUtils {
logger.info("all nodes closed");
}
protected void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress();
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
String host = address.address().getHostName();
int port = address.address().getPort();
}
}
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
.put(getNodeSettings())

@ -19,15 +19,16 @@ import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@Ignore
public class ExtendedNodeReplicaTest extends NodeTestUtils {
public class ReplicaTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeReplicaTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getSimpleName());
@Test
public void testReplicaLevel() throws Exception {
@ -54,15 +55,15 @@ public class ExtendedNodeReplicaTest extends NodeTestUtils {
try {
client.newIndex("test1", settingsTest1, new HashMap<>())
.newIndex("test2", settingsTest2, new HashMap<>());
client.waitForCluster("GREEN", "30s");
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
for (int i = 0; i < 1234; i++) {
client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
for (int i = 0; i < 1234; i++) {
client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
@ -97,11 +98,51 @@ public class ExtendedNodeReplicaTest extends NodeTestUtils {
} catch (Exception e) {
logger.error("delete index failed, ignored. Reason:", e);
}
client.shutdown();
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
assertNull(client.getBulkController().getLastBulkError());
}
}
@Test
public void testUpdateReplicaLevel() throws Exception {
long numberOfShards = 2;
int replicaLevel = 3;
// we need 3 nodes for replica level 3
startNode("2");
startNode("3");
Settings settings = Settings.settingsBuilder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.build();
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.build();
try {
client.newIndex("replicatest", settings, new HashMap<>());
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
for (int i = 0; i < 12345; i++) {
client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.updateReplicaLevel("replicatest", replicaLevel, 30L, TimeUnit.SECONDS);
assertEquals(replicaLevel, client.getReplicaLevel("replicatest"));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertNull(client.getBulkController().getLastBulkError());
}
}

@ -6,16 +6,16 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.SimpleBulkControl;
import org.xbib.elx.common.SimpleBulkMetric;
import org.xbib.elx.api.IndexDefinition;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
public class ExtendedNodeSmokeTest extends NodeTestUtils {
public class SmokeTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeSmokeTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(SmokeTest.class.getSimpleName());
@Test
public void smokeTest() throws Exception {
@ -23,21 +23,19 @@ public class ExtendedNodeSmokeTest extends NodeTestUtils {
.provider(ExtendedNodeClientProvider.class)
.build();
try {
client.setBulkControl(new SimpleBulkControl());
client.setBulkMetric(new SimpleBulkMetric());
client.newIndex("test");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
assertEquals(clusterName, client.getClusterName());
client.checkMapping("test");
client.update("test", "1", "{ \"name\" : \"Another name\"}");
client.flushIngest();
client.flush();
client.waitForRecovery("test", "10s");
client.waitForRecovery("test", 10L, TimeUnit.SECONDS);
client.delete("test", "1");
client.deleteIndex("test");
@ -47,7 +45,7 @@ public class ExtendedNodeSmokeTest extends NodeTestUtils {
assertEquals(0, indexDefinition.getReplicaLevel());
client.newIndex(indexDefinition);
client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
client.flushIngest();
client.flush();
client.updateReplicaLevel(indexDefinition, 2);
int replica = client.getReplicaLevel(indexDefinition);
@ -59,11 +57,11 @@ public class ExtendedNodeSmokeTest extends NodeTestUtils {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
client.shutdown();
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -74,15 +74,15 @@ public class ExtendedTransportClient extends AbstractExtendedClient {
}
@Override
public synchronized void shutdown() throws IOException {
super.shutdown();
logger.info("shutting down...");
public synchronized void close() throws IOException {
super.close();
logger.info("closing");
if (getClient() != null) {
TransportClient client = (TransportClient) getClient();
client.close();
client.threadPool().shutdown();
}
logger.info("shutting down completed");
logger.info("close completed");
}
private Collection<InetSocketTransportAddress> findAddresses(Settings settings) throws IOException {

@ -308,7 +308,9 @@ public class TransportClient extends AbstractClient {
transportService.connectToNode(node);
} catch (Exception e) {
it.remove();
logger.debug("failed to connect to discovered node [" + node + "]", e);
if (logger.isDebugEnabled()) {
logger.debug("failed to connect to discovered node [" + node + "]", e);
}
}
}
}

@ -25,12 +25,12 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ExtendedTransportClientTest extends NodeTestUtils {
public class ClientTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportClientTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ClientTest.class.getSimpleName());
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
@ -54,10 +54,6 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();
client.newIndex("test");
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
try {
client.deleteIndex("test")
.newIndex("test")
@ -65,11 +61,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
} catch (NoNodeAvailableException e) {
logger.error("no node available");
} finally {
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
client.close();
}
}
@ -84,17 +76,17 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
try {
client.newIndex("test");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}");
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
client.shutdown();
assertNull(client.getBulkController().getLastBulkError());
client.close();
}
}
@ -121,11 +113,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc"));
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
client.close();
}
@Test
@ -142,17 +130,17 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
for (int i = 0; i < ACTIONS; i++) {
client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
client.shutdown();
assertNull(client.getBulkController().getLastBulkError());
client.close();
}
}
@ -190,21 +178,21 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
logger.info("waiting for latch...");
if (latch.await(60, TimeUnit.SECONDS)) {
logger.info("flush ...");
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
logger.info("pool to be shut down ...");
pool.shutdown();
logger.info("poot shut down");
}
client.stopBulk("test", "30s");
client.stopBulk("test", 30L, TimeUnit.SECONDS);
assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
assertNull(client.getBulkController().getLastBulkError());
// extra search lookup
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
@ -214,7 +202,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
.setSize(0);
assertEquals(maxthreads * maxloop,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
client.shutdown();
client.close();
}
}
}

@ -9,12 +9,16 @@ import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ExtendedTransportDuplicateIDTest extends NodeTestUtils {
public class DuplicateIDTest extends NodeTestUtils {
private final static Logger logger = LogManager.getLogger(ExtendedTransportDuplicateIDTest.class.getSimpleName());
private final static Logger logger = LogManager.getLogger(DuplicateIDTest.class.getSimpleName());
private final static Long MAX_ACTIONS_PER_REQUEST = 1000L;
@ -33,8 +37,8 @@ public class ExtendedTransportDuplicateIDTest extends NodeTestUtils {
for (int i = 0; i < ACTIONS; i++) {
client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test")
@ -46,12 +50,12 @@ public class ExtendedTransportDuplicateIDTest extends NodeTestUtils {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.shutdown();
client.close();
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -1,61 +0,0 @@
package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class ExtendedTransportUpdateReplicaLevelTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportUpdateReplicaLevelTest.class.getSimpleName());
@Test
public void testUpdateReplicaLevel() throws Exception {
long numberOfShards = 2;
int replicaLevel = 3;
// we need 3 nodes for replica level 3
startNode("2");
startNode("3");
int shardsAfterReplica;
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.build();
Settings settings = Settings.settingsBuilder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.build();
try {
client.newIndex("replicatest", settings, new HashMap<>());
client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) {
client.index("replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.updateReplicaLevel("replicatest", replicaLevel, "30s");
//assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.shutdown();
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
}
}
}

@ -10,13 +10,14 @@ import org.xbib.elx.common.ClientBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ExtendedTransportIndexAliasTest extends NodeTestUtils {
public class IndexShiftTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportIndexAliasTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getSimpleName());
@Test
public void testIndexAlias() throws Exception {
@ -28,21 +29,21 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils {
for (int i = 0; i < 1; i++) {
client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.flush();
client.refreshIndex("test1234");
List<String> simpleAliases = Arrays.asList("a", "b", "c");
client.switchIndex("test", "test1234", simpleAliases);
client.shiftIndex("test", "test1234", simpleAliases);
client.newIndex("test5678");
for (int i = 0; i < 1; i++) {
client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.flush();
client.refreshIndex("test5678");
simpleAliases = Arrays.asList("d", "e", "f");
client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
client.shiftIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
Map<String, String> indexFilters = client.getIndexFilters("test5678");
logger.info("index filters of index test5678 = {}", indexFilters);
@ -52,7 +53,7 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils {
assertTrue(indexFilters.containsKey("d"));
assertTrue(indexFilters.containsKey("e"));
Map<String, String> aliases = client.getAliasFilters("test");
Map<String, String> aliases = client.getIndexFilters(client.resolveAlias("test"));
logger.info("aliases of alias test = {}", aliases);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
@ -60,15 +61,15 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils {
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
client.waitForResponses("30s");
assertFalse(client.hasThrowable());
client.waitForResponses(30L, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
client.shutdown();
assertNull(client.getBulkController().getLastBulkError());
client.close();
}
}
}

@ -18,14 +18,15 @@ import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
public class ExtendedTransportReplicaTest extends NodeTestUtils {
public class ReplicaTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportReplicaTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getSimpleName());
@Test
public void testReplicaLevel() throws Exception {
@ -53,15 +54,15 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils {
try {
client.newIndex("test1", settingsTest1, new HashMap<>())
.newIndex("test2", settingsTest2, new HashMap<>());
client.waitForCluster("GREEN", "30s");
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
for (int i = 0; i < 1234; i++) {
client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
for (int i = 0; i < 1234; i++) {
client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test1");
client.refreshIndex("test2");
} catch (NoNodeAvailableException e) {
@ -96,12 +97,54 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils {
} catch (Exception e) {
logger.error("delete index failed, ignored. Reason:", e);
}
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
client.shutdown();
assertNull(client.getBulkController().getLastBulkError());
client.close();
}
}
@Test
public void testUpdateReplicaLevel() throws Exception {
long numberOfShards = 2;
int replicaLevel = 3;
// we need 3 nodes for replica level 3
startNode("2");
startNode("3");
int shardsAfterReplica;
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.build();
Settings settings = Settings.settingsBuilder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.build();
try {
client.newIndex("replicatest", settings, new HashMap<>());
client.waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
for (int i = 0; i < 12345; i++) {
client.index("replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.updateReplicaLevel("replicatest", replicaLevel, 30L, TimeUnit.SECONDS);
assertEquals(replicaLevel, client.getReplicaLevel("replicatest"));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -6,12 +6,14 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
public class ExtendedTransportClientSingleNodeTest extends NodeTestUtils {
public class SmokeTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportClientSingleNodeTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(SmokeTest.class.getSimpleName());
@Test
public void testSingleDocNodeClient() throws Exception {
@ -22,17 +24,17 @@ public class ExtendedTransportClientSingleNodeTest extends NodeTestUtils {
try {
client.newIndex("test");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest();
client.waitForResponses("30s");
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertFalse(client.hasThrowable());
client.shutdown();
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 2.2.1.4
version = 2.2.1.5
xbib-metrics.version = 1.1.0
xbib-guice.version = 4.0.4

Loading…
Cancel
Save