align with es7102

Jörg Prante 3 years ago
parent f9f62fc56c
commit 6a4bb1efd5

4
.gitignore vendored

@ -4,6 +4,8 @@
/.idea /.idea
/target /target
.DS_Store .DS_Store
*.iml
*~
/.settings /.settings
/.classpath /.classpath
/.project /.project
@ -11,5 +13,3 @@
build build
out out
plugins plugins
*.iml
*~

@ -8,7 +8,7 @@ import java.util.Map;
*/ */
public interface AdminClient extends BasicClient { public interface AdminClient extends BasicClient {
Map<String, ?> getMapping(IndexDefinition indexDefinition); Map<String, Object> getMapping(IndexDefinition indexDefinition);
void checkMapping(IndexDefinition indexDefinition); void checkMapping(IndexDefinition indexDefinition);
@ -20,12 +20,11 @@ public interface AdminClient extends BasicClient {
AdminClient deleteIndex(IndexDefinition indexDefinition); AdminClient deleteIndex(IndexDefinition indexDefinition);
/** /**
* Update replica level. * Update replica level to the one in the index definition.
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param level the replica level
* @return this * @return this
*/ */
AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level); AdminClient updateReplicaLevel(IndexDefinition indexDefinition);
/** /**
* Get replica level. * Get replica level.

@ -3,13 +3,12 @@ package org.xbib.elx.api;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BasicClient extends Closeable { public interface BasicClient extends Closeable {
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);
/** /**
* Set an Elasticsearch client to extend from it. May be null for TransportClient. * Set an Elasticsearch client to extend from it. May be null for TransportClient.
@ -24,14 +23,7 @@ public interface BasicClient extends Closeable {
*/ */
ElasticsearchClient getClient(); ElasticsearchClient getClient();
/** void init(Settings settings);
* Initiative the extended client, the bulk metric and bulk controller,
* creates instances and connect to cluster, if required.
*
* @param settings settings
* @throws IOException if init fails
*/
void init(Settings settings) throws IOException;
/** /**
* Get cluster name. * Get cluster name.
@ -48,16 +40,7 @@ public interface BasicClient extends Closeable {
*/ */
String getHealthColor(long maxWaitTime, TimeUnit timeUnit); String getHealthColor(long maxWaitTime, TimeUnit timeUnit);
/** void waitForHealthyCluster();
* Wait for cluster being healthy.
*
* @param healthColor cluster health color to wait for
* @param maxWaitTime time value
* @param timeUnit time unit
*/
void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit);
void waitForShards(long maxWaitTime, TimeUnit timeUnit);
long getSearchableDocs(IndexDefinition indexDefinition); long getSearchableDocs(IndexDefinition indexDefinition);

@ -5,7 +5,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import java.io.Flushable; import java.io.Flushable;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BulkClient extends BasicClient, Flushable { public interface BulkClient extends BasicClient, Flushable {
@ -13,24 +12,21 @@ public interface BulkClient extends BasicClient, Flushable {
/** /**
* Create a new index. * Create a new index.
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @throws IOException if settings/mapping is invalid or index creation fails
*/ */
void newIndex(IndexDefinition indexDefinition) throws IOException; void newIndex(IndexDefinition indexDefinition);
/** /**
* Start bulk mode for indexes. * Start bulk mode for indexes.
* @param indexDefinition index definition * @param indexDefinition index definition
* @throws IOException if bulk could not be started
*/ */
void startBulk(IndexDefinition indexDefinition) throws IOException; void startBulk(IndexDefinition indexDefinition);
/** /**
* Stop bulk mode. * Stop bulk mode.
* *
* @param indexDefinition index definition * @param indexDefinition index definition
* @throws IOException if bulk could not be startet
*/ */
void stopBulk(IndexDefinition indexDefinition) throws IOException; void stopBulk(IndexDefinition indexDefinition);
/** /**
* Add index request. Each request will be added to a queue for bulking requests. * Add index request. Each request will be added to a queue for bulking requests.
@ -69,7 +65,7 @@ public interface BulkClient extends BasicClient, Flushable {
* Delete request. * Delete request.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @return this * @return this
*/ */
BulkClient delete(IndexDefinition indexDefinition, String id); BulkClient delete(IndexDefinition indexDefinition, String id);
@ -89,7 +85,7 @@ public interface BulkClient extends BasicClient, Flushable {
* Note that updates only work correctly when all operations between nodes are synchronized. * Note that updates only work correctly when all operations between nodes are synchronized.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param source the source * @param source the source
* @return this * @return this
*/ */
@ -98,8 +94,8 @@ public interface BulkClient extends BasicClient, Flushable {
/** /**
* Update document. Use with precaution! Does not work in all cases. * Update document. Use with precaution! Does not work in all cases.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param source the source * @param source the source
* @return this * @return this
*/ */
@ -131,9 +127,8 @@ public interface BulkClient extends BasicClient, Flushable {
* @param value the new value * @param value the new value
* @param timeout timeout * @param timeout timeout
* @param timeUnit time unit * @param timeUnit time unit
* @throws IOException if update index setting failed
*/ */
void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit);
/** /**
* Refresh the index. * Refresh the index.

@ -4,17 +4,12 @@ import org.elasticsearch.action.ActionRequest;
import java.io.Closeable; import java.io.Closeable;
import java.io.Flushable; import java.io.Flushable;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BulkProcessor extends Closeable, Flushable { public interface BulkProcessor extends Closeable, Flushable {
void setEnabled(boolean enabled); void setEnabled(boolean enabled);
void startBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void add(ActionRequest<?> request); void add(ActionRequest<?> request);
boolean waitForBulkResponses(long timeout, TimeUnit unit); boolean waitForBulkResponses(long timeout, TimeUnit unit);

@ -1,74 +1,75 @@
package org.xbib.elx.api; package org.xbib.elx.api;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public interface IndexDefinition { public interface IndexDefinition {
IndexDefinition setIndex(String index); void setIndex(String index);
String getIndex(); String getIndex();
IndexDefinition setType(String type); void setType(String type);
String getType(); String getType();
IndexDefinition setFullIndexName(String fullIndexName); void setFullIndexName(String fullIndexName);
String getFullIndexName(); String getFullIndexName();
IndexDefinition setSettings(String settings); void setSettings(String settings);
String getSettings(); String getSettings();
IndexDefinition setMappings(String mappings); void setMappings(String mappings);
String getMappings(); String getMappings();
IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter); void setDateTimeFormatter(DateTimeFormatter formatter);
DateTimeFormatter getDateTimeFormatter(); DateTimeFormatter getDateTimeFormatter();
IndexDefinition setDateTimePattern(Pattern pattern); void setDateTimePattern(Pattern pattern);
Pattern getDateTimePattern(); Pattern getDateTimePattern();
IndexDefinition setStartBulkRefreshSeconds(int seconds); void setStartBulkRefreshSeconds(int seconds);
int getStartBulkRefreshSeconds(); int getStartBulkRefreshSeconds();
IndexDefinition setStopBulkRefreshSeconds(int seconds); void setStopBulkRefreshSeconds(int seconds);
int getStopBulkRefreshSeconds(); int getStopBulkRefreshSeconds();
IndexDefinition setEnabled(boolean enabled); void setEnabled(boolean enabled);
boolean isEnabled(); boolean isEnabled();
IndexDefinition setShift(boolean shift); void setShift(boolean shift);
boolean isShiftEnabled(); boolean isShiftEnabled();
IndexDefinition setPrune(boolean prune); void setPrune(boolean prune);
boolean isPruneEnabled(); boolean isPruneEnabled();
IndexDefinition setForceMerge(boolean forcemerge); void setForceMerge(boolean forcemerge);
boolean isForceMergeEnabled(); boolean isForceMergeEnabled();
IndexDefinition setReplicaLevel(int replicaLevel); void setShardCount(int shardCount);
int getReplicaLevel(); int getShardCount();
IndexDefinition setRetention(IndexRetention indexRetention); void setReplicaCount(int replicaLevel);
IndexRetention getRetention(); int getReplicaCount();
IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit); void setDelta(int delta);
long getMaxWaitTime(); int getDelta();
TimeUnit getMaxWaitTimeUnit(); void setMinToKeep(int minToKeep);
int getMinToKeep();
} }

@ -1,13 +0,0 @@
package org.xbib.elx.api;
public interface IndexRetention {
IndexRetention setDelta(int delta);
int getDelta();
IndexRetention setMinToKeep(int minToKeep);
int getMinToKeep();
}

@ -81,7 +81,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName());
@Override @Override
public Map<String, ?> getMapping(IndexDefinition indexDefinition) { public Map<String, Object> getMapping(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return null; return null;
} }
@ -102,33 +102,34 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public AdminClient deleteIndex(IndexDefinition indexDefinition) { public AdminClient deleteIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return null; return this;
} }
ensureClientIsPresent();
String index = indexDefinition.getFullIndexName(); String index = indexDefinition.getFullIndexName();
if (index == null) { if (index == null) {
logger.warn("no index name given to delete index"); logger.warn("no index name given to delete index");
return this; return this;
} }
ensureClientIsPresent();
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
return this; return this;
} }
@Override @Override
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) { public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return null; return null;
} }
if (level < 1) { if (indexDefinition.getReplicaCount() < 1) {
logger.warn("invalid replica level"); logger.warn("invalid replica level");
return this; return this;
} }
String index = indexDefinition.getFullIndexName(); logger.info("update replica level for " +
long maxWaitTime = indexDefinition.getMaxWaitTime(); indexDefinition + " to " + indexDefinition.getReplicaCount());
TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit(); updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", indexDefinition.getReplicaCount(),
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); 30L, TimeUnit.SECONDS);
waitForHealthyCluster();
return this; return this;
} }
@ -294,13 +295,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() && return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() &&
indexDefinition.getRetention() != null &&
indexDefinition.getDateTimePattern() != null ? indexDefinition.getDateTimePattern() != null ?
pruneIndex(indexDefinition.getIndex(), pruneIndex(indexDefinition.getIndex(),
indexDefinition.getFullIndexName(), indexDefinition.getFullIndexName(),
indexDefinition.getDateTimePattern(), indexDefinition.getDateTimePattern(),
indexDefinition.getRetention().getDelta(), indexDefinition.getDelta(),
indexDefinition.getRetention().getMinToKeep()) : new EmptyPruneResult(); indexDefinition.getMinToKeep()) : new EmptyPruneResult();
} }
private IndexPruneResult pruneIndex(String index, private IndexPruneResult pruneIndex(String index,
@ -409,7 +409,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
if (forceMergeResponse.getFailedShards() > 0) { if (forceMergeResponse.getFailedShards() > 0) {
throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards());
} }
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
return true; return true;
} }
@ -421,7 +421,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
.settings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); .settings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
} }
@Override @Override

@ -76,7 +76,7 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
this.settings = settings; this.settings = settings;
@ -105,13 +105,13 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();
if (key == null) { if (key == null) {
throw new IOException("no key given"); throw new IllegalArgumentException("no key given");
} }
if (value == null) { if (value == null) {
throw new IOException("no value given"); throw new IllegalArgumentException("no value given");
} }
Settings.Builder updateSettingsBuilder = Settings.builder(); Settings.Builder updateSettingsBuilder = Settings.builder();
updateSettingsBuilder.put(key, value.toString()); updateSettingsBuilder.put(key, value.toString());
@ -138,8 +138,15 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { public void waitForHealthyCluster() {
ensureClientIsPresent(); ensureClientIsPresent();
String statusString = settings.get(Parameters.CLUSTER_TARGET_HEALTH.getName(),
Parameters.CLUSTER_TARGET_HEALTH.getString());
String waitTimeStr = settings.get(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(),
Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getString());
TimeValue timeValue = TimeValue.parseTimeValue(waitTimeStr, TimeValue.timeValueMinutes(30L), "");
long maxWaitTime = timeValue.minutes();
TimeUnit timeUnit = TimeUnit.MINUTES;
logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit); logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit);
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
@ -155,23 +162,6 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
} }
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent();
logger.info("waiting for cluster shard settling");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.waitForRelocatingShards(0)
.timeout(timeout);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) {
String message = "timeout waiting for cluster shards: " + timeout;
logger.error(message);
throw new IllegalStateException(message);
}
}
@Override @Override
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();
@ -228,20 +218,20 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
} }
protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; protected abstract ElasticsearchClient createClient(Settings settings);
protected abstract void closeClient(Settings settings) throws IOException; protected abstract void closeClient(Settings settings) throws IOException;
protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();
if (index == null) { if (index == null) {
throw new IOException("no index name given"); throw new IllegalArgumentException("no index name given");
} }
if (key == null) { if (key == null) {
throw new IOException("no key given"); throw new IllegalArgumentException("no key given");
} }
if (value == null) { if (value == null) {
throw new IOException("no value given"); throw new IllegalArgumentException("no value given");
} }
Settings.Builder updateSettingsBuilder = Settings.builder(); Settings.Builder updateSettingsBuilder = Settings.builder();
updateSettingsBuilder.put(key, value.toString()); updateSettingsBuilder.put(key, value.toString());

@ -1,5 +1,6 @@
package org.xbib.elx.common; package org.xbib.elx.common;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
@ -12,6 +13,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -40,7 +42,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
if (closed.compareAndSet(true, false)) { if (closed.compareAndSet(true, false)) {
super.init(settings); super.init(settings);
bulkProcessor = new DefaultBulkProcessor(this, settings); bulkProcessor = new DefaultBulkProcessor(this, settings);
@ -73,7 +75,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public void newIndex(IndexDefinition indexDefinition) throws IOException { public void newIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return; return;
} }
@ -86,26 +88,36 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
throw new IllegalArgumentException("no index type given"); throw new IllegalArgumentException("no index type given");
} }
ensureClientIsPresent(); ensureClientIsPresent();
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) CreateIndexRequestBuilder createIndexRequestBuilder =
new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
.setIndex(index); .setIndex(index);
if (indexDefinition.getSettings() == null) { if (indexDefinition.getSettings() != null) {
XContentBuilder builder = JsonXContent.contentBuilder() indexDefinition.setSettings(Strings.toString(Settings.builder()
.startObject() .loadFromSource(indexDefinition.getSettings())
.startObject("index") .put("index.number_of_shards", indexDefinition.getShardCount())
.field("number_of_shards", 1) .put("index.number_of_replicas", 0) // always 0
.field("number_of_replicas", 0) .build()));
.endObject() } else {
.endObject(); indexDefinition.setSettings(Strings.toString(Settings.builder()
indexDefinition.setSettings(builder.string()); .put("index.number_of_shards", indexDefinition.getShardCount())
} .put("index.number_of_replicas", 0) // always 0
Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings()).build(); .build()));
createIndexRequestBuilder.setSettings(settings); }
createIndexRequestBuilder.setSettings(indexDefinition.getSettings());
if (indexDefinition.getMappings() != null) { if (indexDefinition.getMappings() != null) {
Map<String, Object> mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); try {
createIndexRequestBuilder.addMapping(type, mappings); Map<String, Object> mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered();
createIndexRequestBuilder.addMapping(type, mappings);
} catch (IOException e) {
logger.log(Level.WARN, e.getMessage(), e);
}
} else { } else {
XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject(); try {
createIndexRequestBuilder.addMapping(type, builder); XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject();
createIndexRequestBuilder.addMapping(type, builder);
} catch (IOException e) {
logger.log(Level.WARN, e.getMessage(), e);
}
} }
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
if (createIndexResponse.isAcknowledged()) { if (createIndexResponse.isAcknowledged()) {
@ -114,11 +126,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
logger.warn("index creation of {} not acknowledged", index); logger.warn("index creation of {} not acknowledged", index);
return; return;
} }
waitForCluster("GREEN", 30L, TimeUnit.MINUTES); waitForHealthyCluster();
} }
@Override @Override
public void startBulk(IndexDefinition indexDefinition) throws IOException { public void startBulk(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return; return;
} }
@ -133,18 +145,41 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
bulkQueueSize = 256L; bulkQueueSize = 256L;
} }
putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS);
bulkProcessor.startBulkMode(indexDefinition); String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStartBulkRefreshSeconds();
if (interval != 0) {
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
}
} }
} }
@Override @Override
public void stopBulk(IndexDefinition indexDefinition) throws IOException { public void stopBulk(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return; return;
} }
if (bulkProcessor != null) { if (bulkProcessor != null) {
ensureClientIsPresent(); ensureClientIsPresent();
bulkProcessor.stopBulkMode(indexDefinition); String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStopBulkRefreshSeconds();
try {
bulkProcessor.flush();
} catch (IOException e) {
// can never happen
}
if (bulkProcessor.waitForBulkResponses(60L, TimeUnit.SECONDS)) {
if (interval != 0) {
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
}
}
} }
} }
@ -226,7 +261,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) {
super.updateIndexSetting(index, key, value, timeout, timeUnit); super.updateIndexSetting(index, key, value, timeout, timeUnit);
} }

@ -49,7 +49,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
if (closed.compareAndSet(true, false)) { if (closed.compareAndSet(true, false)) {
super.init(settings); super.init(settings);
this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); this.searchMetric = new DefaultSearchMetric(getScheduler(), settings);
@ -137,7 +137,6 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder); queryBuilder.accept(searchRequestBuilder);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute(); ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
searchMetric.getCurrentQueries().inc(); searchMetric.getCurrentQueries().inc();
SearchResponse initialSearchResponse = actionFuture.actionGet(); SearchResponse initialSearchResponse = actionFuture.actionGet();

@ -14,7 +14,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkClient;
import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.IndexDefinition;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -34,8 +33,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class); private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class);
private final BulkClient bulkClient;
private final AtomicBoolean enabled; private final AtomicBoolean enabled;
private final ElasticsearchClient client; private final ElasticsearchClient client;
@ -59,7 +56,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final int permits; private final int permits;
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
this.bulkClient = bulkClient;
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
@ -96,35 +92,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
this.enabled.set(enabled); this.enabled.set(enabled);
} }
@Override
public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStartBulkRefreshSeconds();
if (interval != 0) {
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
bulkClient.updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
}
}
@Override
public void stopBulkMode(IndexDefinition indexDefinition) throws IOException {
String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStopBulkRefreshSeconds();
flush();
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
if (interval != 0) {
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
bulkClient.updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
}
}
}
@Override @Override
public void setMaxBulkActions(int bulkActions) { public void setMaxBulkActions(int bulkActions) {
this.bulkActions = bulkActions; this.bulkActions = bulkActions;
@ -205,7 +172,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
} }
drainSemaphore(0L, TimeUnit.NANOSECONDS); drainSemaphore(0L, TimeUnit.NANOSECONDS);
bulkListener.close(); bulkListener.close();
} catch (InterruptedException exc) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }

@ -1,14 +1,12 @@
package org.xbib.elx.common; package org.xbib.elx.common;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.yaml.YamlXContent; import org.elasticsearch.common.xcontent.yaml.YamlXContent;
import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.AdminClient;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexRetention;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -20,7 +18,6 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class DefaultIndexDefinition implements IndexDefinition { public class DefaultIndexDefinition implements IndexDefinition {
@ -47,42 +44,43 @@ public class DefaultIndexDefinition implements IndexDefinition {
private boolean forcemerge; private boolean forcemerge;
private int replicaLevel; private int shardCount;
private IndexRetention indexRetention; private int replicaCount;
private long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private int startRefreshInterval; private int startRefreshInterval;
private int stopRefreshInterval; private int stopRefreshInterval;
private int delta;
private int minToKeep;
public DefaultIndexDefinition(String index, String type) { public DefaultIndexDefinition(String index, String type) {
setIndex(index); setIndex(index);
setType(type); setType(type);
setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()));
setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$"));
setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now()));
setMaxWaitTime(30, TimeUnit.SECONDS); setShardCount(1);
setShift(false); setShift(false);
setPrune(false); setPrune(false);
setForceMerge(false);
setEnabled(true); setEnabled(true);
} }
public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings)
throws IOException { throws IOException {
String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(),
Parameters.BULK_MAX_WAIT_RESPONSE.getString());
TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), "");
setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS);
String indexName = settings.get("name", index); String indexName = settings.get("name", index);
String indexType = settings.get("type", type); String indexType = settings.get("type", type);
boolean enabled = settings.getAsBoolean("enabled", true);
setIndex(indexName); setIndex(indexName);
setType(indexType); setType(indexType);
boolean enabled = settings.getAsBoolean("enabled", true);
setEnabled(enabled); setEnabled(enabled);
boolean forcemerge = settings.getAsBoolean("forcemerge", true);
setForceMerge(forcemerge);
setShardCount(settings.getAsInt("shards", 1));
setReplicaCount(settings.getAsInt("replicas", 1));
String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName);
setFullIndexName(fullIndexName); setFullIndexName(fullIndexName);
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(), setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(),
@ -92,7 +90,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
if (settings.get("settings") != null && settings.get("mapping") != null) { if (settings.get("settings") != null && settings.get("mapping") != null) {
setSettings(findSettingsFrom(settings.get("settings"))); setSettings(findSettingsFrom(settings.get("settings")));
setMappings(findMappingsFrom(settings.get("mapping"))); setMappings(findMappingsFrom(settings.get("mapping")));
setReplicaLevel(settings.getAsInt("replica", 0)); setReplicaCount(settings.getAsInt("replica", 0));
boolean shift = settings.getAsBoolean("shift", false); boolean shift = settings.getAsBoolean("shift", false);
setShift(shift); setShift(shift);
if (shift) { if (shift) {
@ -110,19 +108,16 @@ public class DefaultIndexDefinition implements IndexDefinition {
boolean prune = settings.getAsBoolean("prune", false); boolean prune = settings.getAsBoolean("prune", false);
setPrune(prune); setPrune(prune);
if (prune) { if (prune) {
IndexRetention indexRetention = new DefaultIndexRetention() setMinToKeep(settings.getAsInt("retention.mintokeep", 2));
.setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) setDelta(settings.getAsInt("retention.delta", 2));
.setDelta(settings.getAsInt("retention.delta", 0));
setRetention(indexRetention);
} }
} }
} }
} }
@Override @Override
public IndexDefinition setIndex(String index) { public void setIndex(String index) {
this.index = index; this.index = index;
return this;
} }
@Override @Override
@ -131,9 +126,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setType(String type) { public void setType(String type) {
this.type = type; this.type = type;
return this;
} }
@Override @Override
@ -143,9 +137,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
@Override @Override
public IndexDefinition setFullIndexName(String fullIndexName) { public void setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName; this.fullIndexName = fullIndexName;
return this;
} }
@Override @Override
@ -154,9 +147,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setSettings(String settings) { public void setSettings(String settings) {
this.settings = settings; this.settings = settings;
return this;
} }
@Override @Override
@ -165,9 +157,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setMappings(String mappings) { public void setMappings(String mappings) {
this.mappings = mappings; this.mappings = mappings;
return this;
} }
@Override @Override
@ -176,9 +167,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter) { public void setDateTimeFormatter(DateTimeFormatter formatter) {
this.formatter = formatter; this.formatter = formatter;
return this;
} }
@Override @Override
@ -187,9 +177,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setDateTimePattern(Pattern pattern) { public void setDateTimePattern(Pattern pattern) {
this.pattern = pattern; this.pattern = pattern;
return this;
} }
@Override @Override
@ -198,9 +187,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setStartBulkRefreshSeconds(int seconds) { public void setStartBulkRefreshSeconds(int seconds) {
this.startRefreshInterval = seconds; this.startRefreshInterval = seconds;
return this;
} }
@Override @Override
@ -209,9 +197,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setStopBulkRefreshSeconds(int seconds) { public void setStopBulkRefreshSeconds(int seconds) {
this.stopRefreshInterval = seconds; this.stopRefreshInterval = seconds;
return this;
} }
@Override @Override
@ -220,9 +207,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setEnabled(boolean enabled) { public void setEnabled(boolean enabled) {
this.enabled = enabled; this.enabled = enabled;
return this;
} }
@Override @Override
@ -231,9 +217,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setShift(boolean shift) { public void setShift(boolean shift) {
this.shift = shift; this.shift = shift;
return this;
} }
@Override @Override
@ -242,9 +227,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setPrune(boolean prune) { public void setPrune(boolean prune) {
this.prune = prune; this.prune = prune;
return this;
} }
@Override @Override
@ -253,9 +237,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setForceMerge(boolean forcemerge) { public void setForceMerge(boolean forcemerge) {
this.forcemerge = forcemerge; this.forcemerge = forcemerge;
return this;
} }
@Override @Override
@ -264,44 +247,44 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setReplicaLevel(int replicaLevel) { public void setShardCount(int shardCount) {
this.replicaLevel = replicaLevel; this.shardCount = shardCount;
return this;
} }
@Override @Override
public int getReplicaLevel() { public int getShardCount() {
return replicaLevel; return shardCount;
} }
@Override @Override
public IndexDefinition setRetention(IndexRetention indexRetention) { public void setReplicaCount(int replicaCount) {
this.indexRetention = indexRetention; this.replicaCount = replicaCount;
return this;
} }
@Override @Override
public IndexRetention getRetention() { public int getReplicaCount() {
return indexRetention; return replicaCount;
} }
@Override @Override
public IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit) { public void setDelta(int delta) {
this.maxWaitTime = maxWaitTime; this.delta = delta;
this.maxWaitTimeUnit = timeUnit;
return this;
} }
@Override @Override
public long getMaxWaitTime() { public int getDelta() {
return maxWaitTime; return delta;
} }
@Override @Override
public TimeUnit getMaxWaitTimeUnit() { public void setMinToKeep(int minToKeep) {
return maxWaitTimeUnit; this.minToKeep = minToKeep;
} }
@Override
public int getMinToKeep() {
return minToKeep;
}
private static String findSettingsFrom(String string) throws IOException { private static String findSettingsFrom(String string) throws IOException {
if (string == null) { if (string == null) {

@ -1,37 +0,0 @@
package org.xbib.elx.common;
import org.xbib.elx.api.IndexRetention;
public class DefaultIndexRetention implements IndexRetention {
private int delta;
private int minToKeep;
public DefaultIndexRetention() {
this.delta = 2;
this.minToKeep = 2;
}
@Override
public IndexRetention setDelta(int delta) {
this.delta = delta;
return this;
}
@Override
public int getDelta() {
return delta;
}
@Override
public IndexRetention setMinToKeep(int minToKeep) {
this.minToKeep = minToKeep;
return this;
}
@Override
public int getMinToKeep() {
return minToKeep;
}
}

@ -125,7 +125,11 @@ public class DefaultSearchMetric implements SearchMetric {
private void log() { private void log() {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("docs = " + getTotalQueries().getCount()); logger.info("queries = " + getTotalQueries().getCount() +
" succeeded = " + getSucceededQueries().getCount() +
" empty = " + getEmptyQueries().getCount() +
" failed = " + getFailedQueries() +
" timeouts = " + getTimeoutQueries().getCount());
} }
} }
} }

@ -3,8 +3,6 @@ package org.xbib.elx.common;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.TimeUnit;
/** /**
* A mocked client, it does not perform any actions on a cluster. Useful for testing. * A mocked client, it does not perform any actions on a cluster. Useful for testing.
*/ */
@ -28,14 +26,6 @@ public class MockAdminClient extends AbstractAdminClient {
protected void closeClient(Settings settings) { protected void closeClient(Settings settings) {
} }
@Override
public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) {
}
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override @Override
public void close() { public void close() {
// nothing to do // nothing to do

@ -26,10 +26,6 @@ public class MockBulkClient extends AbstractBulkClient {
return null; return null;
} }
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override @Override
protected ElasticsearchClient createClient(Settings settings) { protected ElasticsearchClient createClient(Settings settings) {
return null; return null;

@ -3,8 +3,6 @@ package org.xbib.elx.common;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.TimeUnit;
/** /**
* A mocked client, it does not perform any actions on a cluster. Useful for testing. * A mocked client, it does not perform any actions on a cluster. Useful for testing.
*/ */
@ -24,10 +22,6 @@ public class MockSearchClient extends AbstractSearchClient {
return null; return null;
} }
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override @Override
protected ElasticsearchClient createClient(Settings settings) { protected ElasticsearchClient createClient(Settings settings) {
return null; return null;

@ -2,6 +2,10 @@ package org.xbib.elx.common;
public enum Parameters { public enum Parameters {
CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"),
CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"),
DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"),
BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"),

@ -3,8 +3,6 @@ package org.xbib.elx.common.test;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
@ -62,7 +60,6 @@ class AliasTest {
long t1 = (System.nanoTime() - t0) / 1000000; long t1 = (System.nanoTime() - t0) / 1000000;
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1); logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
assertTrue(t1 >= 0); assertTrue(t1 >= 0);
client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest());
} }
@Test @Test
@ -90,6 +87,7 @@ class AliasTest {
getAliasesRequest.aliases(alias); getAliasesRequest.aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
// reverse order
Set<String> result = new TreeSet<>(Collections.reverseOrder()); Set<String> result = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) { for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
Matcher m = pattern.matcher(indexName.value); Matcher m = pattern.matcher(indexName.value);

@ -35,7 +35,7 @@ class BulkClientTest {
void testNewIndex() throws Exception { void testNewIndex() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -46,7 +46,7 @@ class BulkClientTest {
void testSingleDoc() throws Exception { void testSingleDoc() throws Exception {
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -65,7 +65,7 @@ class BulkClientTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -93,7 +93,7 @@ class BulkClientTest {
long timeout = 120L; long timeout = 120L;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);

@ -37,7 +37,7 @@ class DuplicateIDTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");

@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexRetention;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.DefaultIndexRetention;
import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClient;
import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeAdminClientProvider;
import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClient;
@ -40,11 +38,11 @@ class IndexPruneTest {
void testPrune() throws IOException { void testPrune() throws IOException {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setIndex("test_prune"); indexDefinition.setIndex("test_prune");
@ -64,10 +62,8 @@ class IndexPruneTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
IndexRetention indexRetention = new DefaultIndexRetention(); indexDefinition.setDelta(2);
indexRetention.setDelta(2); indexDefinition.setMinToKeep(2);
indexRetention.setMinToKeep(2);
indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true); indexDefinition.setEnabled(true);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);

@ -38,11 +38,11 @@ class IndexShiftTest {
void testIndexShift() throws Exception { void testIndexShift() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_shift"); indexDefinition.setFullIndexName("test_shift");

@ -40,7 +40,7 @@ class SearchTest {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition); bulkClient.startBulk(indexDefinition);
@ -60,7 +60,7 @@ class SearchTest {
} }
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client()) try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client())
.setSearchClientProvider(NodeSearchClientProvider.class) .setSearchClientProvider(NodeSearchClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
// test stream count // test stream count
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb

@ -36,18 +36,17 @@ class SmokeTest {
void smokeTest() throws Exception { void smokeTest() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client())
.setAdminClientProvider(NodeAdminClientProvider.class) .setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build(); .build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
.setBulkClientProvider(NodeBulkClientProvider.class) .setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY);
assertEquals("test_smoke", indexDefinition.getIndex()); assertEquals("test_smoke", indexDefinition.getIndex());
assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke"));
assertEquals(0, indexDefinition.getReplicaLevel());
indexDefinition.setType("doc"); indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
@ -64,9 +63,8 @@ class SmokeTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.updateReplicaLevel(indexDefinition, 2); adminClient.updateReplicaLevel(indexDefinition);
int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {

@ -2,21 +2,12 @@ package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback;
@ -67,8 +58,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Objects.requireNonNull(helper); Objects.requireNonNull(helper);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode(); helper.startNode();
helper.greenHealth();
logger.info("cluser name = {}", helper.clusterName());
} }
@Override @Override
@ -137,11 +126,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return cluster; return cluster;
} }
Settings getNodeSettings() { Settings getClientSettings() {
return Settings.builder() return Settings.builder()
.put("name", "elx-client") // for threadpool name .put("name", getClusterName() + "-client-name") // for threadpool name
.put("cluster.name", getClusterName()) .put("cluster.name", getClusterName())
.put("path.home", getHome()) .put("path.home", getHome())
.put("node.name", getClusterName() + "-node")
.put("node.client", true)
.put("node.master", false)
.put("node.data", false)
.build(); .build();
} }
@ -170,8 +163,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Node buildNode() { Node buildNode() {
String id = "1"; String id = "1";
Settings nodeSettings = Settings.builder() Settings nodeSettings = Settings.builder()
.put(getNodeSettings()) .put("name", getClusterName() + "-server-name") // for threadpool name
.put("node.name", id) .put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("node.name", getClusterName() + "-node")
.put("node.client", false)
.put("node.master", true)
.put("node.data", true)
.build(); .build();
this.node = new MockNode(nodeSettings); this.node = new MockNode(nodeSettings);
return node; return node;
@ -184,31 +182,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
void closeNodes() { void closeNodes() {
if (node != null) { if (node != null) {
logger.info("closing all nodes"); logger.info("closing all nodes");
node.client().close();
node.close(); node.close();
} }
} }
void greenHealth() throws IOException {
try {
ClusterHealthResponse healthResponse = client().execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
}
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
}
String clusterName() {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value();
}
private static final Random random = new Random(); private static final Random random = new Random();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();

@ -5,8 +5,6 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractAdminClient; import org.xbib.elx.common.AbstractAdminClient;
import java.io.IOException;
/** /**
* Transport admin client. * Transport admin client.
*/ */
@ -20,12 +18,12 @@ public class TransportAdminClient extends AbstractAdminClient {
} }
@Override @Override
public ElasticsearchClient createClient(Settings settings) throws IOException { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings); return helper.createClient(settings);
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
super.init(settings); super.init(settings);
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
} }

@ -19,12 +19,12 @@ public class TransportBulkClient extends AbstractBulkClient {
} }
@Override @Override
public ElasticsearchClient createClient(Settings settings) throws IOException { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings); return helper.createClient(settings);
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
super.init(settings); super.init(settings);
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
} }

@ -46,7 +46,7 @@ public class TransportClientHelper {
} }
} }
public void init(TransportClient transportClient, Settings settings) throws IOException { public void init(TransportClient transportClient, Settings settings) {
Collection<TransportAddress> addrs = findAddresses(settings); Collection<TransportAddress> addrs = findAddresses(settings);
if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) { if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) {
throw new NoNodeAvailableException("no cluster nodes available, check settings = " throw new NoNodeAvailableException("no cluster nodes available, check settings = "
@ -54,7 +54,7 @@ public class TransportClientHelper {
} }
} }
private Collection<TransportAddress> findAddresses(Settings settings) throws IOException { private Collection<TransportAddress> findAddresses(Settings settings) {
final int defaultPort = settings.getAsInt("port", 9300); final int defaultPort = settings.getAsInt("port", 9300);
Collection<TransportAddress> addresses = new ArrayList<>(); Collection<TransportAddress> addresses = new ArrayList<>();
for (String hostname : settings.getAsArray("host")) { for (String hostname : settings.getAsArray("host")) {
@ -66,16 +66,20 @@ public class TransportClientHelper {
int port = Integer.parseInt(splitHost[1]); int port = Integer.parseInt(splitHost[1]);
TransportAddress address = new InetSocketTransportAddress(inetAddress, port); TransportAddress address = new InetSocketTransportAddress(inetAddress, port);
addresses.add(address); addresses.add(address);
} catch (NumberFormatException e) { } catch (IOException e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
} }
} else if (splitHost.length == 1) { } else if (splitHost.length == 1) {
String host = splitHost[0]; try {
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); String host = splitHost[0];
TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
addresses.add(address); TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort);
addresses.add(address);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
} else { } else {
throw new IOException("invalid hostname specification: " + hostname); throw new IllegalArgumentException("invalid hostname specification: " + hostname);
} }
} }
return addresses; return addresses;

@ -4,7 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractSearchClient; import org.xbib.elx.common.AbstractSearchClient;
import java.io.IOException;
/** /**
* Transport search client with additional methods. * Transport search client with additional methods.
@ -19,12 +18,12 @@ public class TransportSearchClient extends AbstractSearchClient {
} }
@Override @Override
public ElasticsearchClient createClient(Settings settings) throws IOException { public ElasticsearchClient createClient(Settings settings) {
return helper.createClient(settings); return helper.createClient(settings);
} }
@Override @Override
public void init(Settings settings) throws IOException { public void init(Settings settings) {
super.init(settings); super.init(settings);
helper.init((TransportClient) getClient(), settings); helper.init((TransportClient) getClient(), settings);
} }

@ -35,7 +35,7 @@ class BulkClientTest {
void testNewIndex() throws Exception { void testNewIndex() throws Exception {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -46,7 +46,7 @@ class BulkClientTest {
void testSingleDoc() throws Exception { void testSingleDoc() throws Exception {
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -65,7 +65,7 @@ class BulkClientTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -91,7 +91,7 @@ class BulkClientTest {
final long timeout = 120L; final long timeout = 120L;
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test"); indexDefinition.setFullIndexName("test");

@ -32,7 +32,7 @@ class DuplicateIDTest {
long numactions = ACTIONS; long numactions = ACTIONS;
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);

@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexRetention;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.DefaultIndexRetention;
import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClient;
import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportAdminClientProvider;
import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClient;
@ -40,11 +38,11 @@ class IndexPruneTest {
void testPrune() throws IOException { void testPrune() throws IOException {
try (TransportAdminClient adminClient = ClientBuilder.builder() try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class) .setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setIndex("test_prune"); indexDefinition.setIndex("test_prune");
@ -64,9 +62,9 @@ class IndexPruneTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
IndexRetention indexRetention = new DefaultIndexRetention();
indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true); indexDefinition.setEnabled(true);
indexDefinition.setDelta(2);
indexDefinition.setMinToKeep(2);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult); logger.info("prune result = " + indexPruneResult);

@ -40,11 +40,11 @@ class IndexShiftTest {
void testIndexShift() throws Exception { void testIndexShift() throws Exception {
try (TransportAdminClient adminClient = ClientBuilder.builder() try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class) .setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_shift"); indexDefinition.setFullIndexName("test_shift");

@ -42,7 +42,7 @@ class SearchTest {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
try (TransportBulkClient bulkClient = ClientBuilder.builder() try (TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition); bulkClient.startBulk(indexDefinition);
@ -62,7 +62,7 @@ class SearchTest {
} }
try (TransportSearchClient searchClient = ClientBuilder.builder() try (TransportSearchClient searchClient = ClientBuilder.builder()
.setSearchClientProvider(TransportSearchClientProvider.class) .setSearchClientProvider(TransportSearchClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
// test stream count // test stream count
Stream<SearchHit> stream = searchClient.search(qb -> qb Stream<SearchHit> stream = searchClient.search(qb -> qb

@ -36,17 +36,16 @@ class SmokeTest {
void smokeTest() throws Exception { void smokeTest() throws Exception {
try (TransportAdminClient adminClient = ClientBuilder.builder() try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class) .setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build(); .build();
TransportBulkClient bulkClient = ClientBuilder.builder() TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class) .setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings()) .put(helper.getClientSettings())
.build()) { .build()) {
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY);
assertEquals("test", indexDefinition.getIndex()); assertEquals("test", indexDefinition.getIndex());
assertTrue(indexDefinition.getFullIndexName().startsWith("test")); assertTrue(indexDefinition.getFullIndexName().startsWith("test"));
assertEquals(0, indexDefinition.getReplicaLevel());
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
@ -62,9 +61,8 @@ class SmokeTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.updateReplicaLevel(indexDefinition, 2); adminClient.updateReplicaLevel(indexDefinition);
int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkProcessor().getLastBulkError() != null) { if (bulkClient.getBulkProcessor().getLastBulkError() != null) {

@ -2,21 +2,12 @@ package org.xbib.elx.transport.test;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback;
@ -68,8 +59,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Objects.requireNonNull(helper); Objects.requireNonNull(helper);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode(); helper.startNode();
helper.greenHealth();
logger.info("cluster name = {}", helper.clusterName());
} }
@Override @Override
@ -122,8 +111,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Node node; Node node;
AbstractClient client;
void setHome(String home) { void setHome(String home) {
this.home = home; this.home = home;
} }
@ -140,14 +127,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return cluster; return cluster;
} }
Settings getNodeSettings() { ElasticsearchClient client() {
return Settings.builder() return node.client();
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.build();
} }
Settings getTransportSettings() { Settings getClientSettings() {
return Settings.builder() return Settings.builder()
.put("cluster.name", cluster) .put("cluster.name", cluster)
.put("path.home", getHome()) .put("path.home", getHome())
@ -159,7 +143,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
void startNode() { void startNode() {
buildNode().start(); buildNode().start();
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); NodesInfoResponse response = node.client().execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress() Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress(); .publishAddress();
if (obj instanceof InetSocketTransportAddress) { if (obj instanceof InetSocketTransportAddress) {
@ -180,48 +164,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
} }
Node buildNode() { Node buildNode() {
String id = "1"; node = new MockNode(Settings.builder()
Settings nodeSettings = Settings.builder() .put("name", getClusterName() + "-server-name") // for threadpool name
.put(getNodeSettings()) .put("cluster.name", getClusterName())
.put("node.name", id) .put("path.home", getHome())
.build(); .put("node.name", getClusterName() + "-node")
node = new MockNode(nodeSettings); .put("node.client", false)
client = (AbstractClient) node.client(); .put("node.master", true)
.put("node.data", true)
.build());
return node; return node;
} }
void closeNodes() { void closeNodes() {
if (client != null) {
logger.info("closing client");
client.close();
}
if (node != null) { if (node != null) {
logger.info("closing node"); logger.info("closing node");
node.client().close();
node.close(); node.close();
} }
} }
void greenHealth() throws IOException {
try {
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
}
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
}
String clusterName() {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value();
}
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final Random random = new SecureRandom(); private static final Random random = new SecureRandom();

Loading…
Cancel
Save