diff --git a/.gitignore b/.gitignore index 314748d..852078a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ /.idea /target .DS_Store +*.iml +*~ /.settings /.classpath /.project @@ -11,5 +13,3 @@ build out plugins -*.iml -*~ diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index 0e6dbe5..dbab173 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -8,7 +8,7 @@ import java.util.Map; */ public interface AdminClient extends BasicClient { - Map getMapping(IndexDefinition indexDefinition); + Map getMapping(IndexDefinition indexDefinition); void checkMapping(IndexDefinition indexDefinition); @@ -20,12 +20,11 @@ public interface AdminClient extends BasicClient { AdminClient deleteIndex(IndexDefinition indexDefinition); /** - * Update replica level. + * Update replica level to the one in the index definition. * @param indexDefinition the index definition - * @param level the replica level * @return this */ - AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level); + AdminClient updateReplicaLevel(IndexDefinition indexDefinition); /** * Get replica level. diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index fe0ead6..62b767b 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -3,13 +3,12 @@ package org.xbib.elx.api; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import java.io.Closeable; -import java.io.IOException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public interface BasicClient extends Closeable { - void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; + void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit); /** * Set an Elasticsearch client to extend from it. May be null for TransportClient. @@ -24,14 +23,7 @@ public interface BasicClient extends Closeable { */ ElasticsearchClient getClient(); - /** - * Initiative the extended client, the bulk metric and bulk controller, - * creates instances and connect to cluster, if required. - * - * @param settings settings - * @throws IOException if init fails - */ - void init(Settings settings) throws IOException; + void init(Settings settings); /** * Get cluster name. @@ -48,16 +40,7 @@ public interface BasicClient extends Closeable { */ String getHealthColor(long maxWaitTime, TimeUnit timeUnit); - /** - * Wait for cluster being healthy. - * - * @param healthColor cluster health color to wait for - * @param maxWaitTime time value - * @param timeUnit time unit - */ - void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); - - void waitForShards(long maxWaitTime, TimeUnit timeUnit); + void waitForHealthyCluster(); long getSearchableDocs(IndexDefinition indexDefinition); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 13920db..5c9ce79 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -5,7 +5,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.bytes.BytesReference; import java.io.Flushable; -import java.io.IOException; import java.util.concurrent.TimeUnit; public interface BulkClient extends BasicClient, Flushable { @@ -13,24 +12,21 @@ public interface BulkClient extends BasicClient, Flushable { /** * Create a new index. * @param indexDefinition the index definition - * @throws IOException if settings/mapping is invalid or index creation fails */ - void newIndex(IndexDefinition indexDefinition) throws IOException; + void newIndex(IndexDefinition indexDefinition); /** * Start bulk mode for indexes. * @param indexDefinition index definition - * @throws IOException if bulk could not be started */ - void startBulk(IndexDefinition indexDefinition) throws IOException; + void startBulk(IndexDefinition indexDefinition); /** * Stop bulk mode. * * @param indexDefinition index definition - * @throws IOException if bulk could not be startet */ - void stopBulk(IndexDefinition indexDefinition) throws IOException; + void stopBulk(IndexDefinition indexDefinition); /** * Add index request. Each request will be added to a queue for bulking requests. @@ -69,7 +65,7 @@ public interface BulkClient extends BasicClient, Flushable { * Delete request. * * @param indexDefinition the index definition - * @param id the id + * @param id the id * @return this */ BulkClient delete(IndexDefinition indexDefinition, String id); @@ -89,7 +85,7 @@ public interface BulkClient extends BasicClient, Flushable { * Note that updates only work correctly when all operations between nodes are synchronized. * * @param indexDefinition the index definition - * @param id the id + * @param id the id * @param source the source * @return this */ @@ -98,8 +94,8 @@ public interface BulkClient extends BasicClient, Flushable { /** * Update document. Use with precaution! Does not work in all cases. * - * @param indexDefinition the index definition - * @param id the id + * @param indexDefinition the index definition + * @param id the id * @param source the source * @return this */ @@ -131,9 +127,8 @@ public interface BulkClient extends BasicClient, Flushable { * @param value the new value * @param timeout timeout * @param timeUnit time unit - * @throws IOException if update index setting failed */ - void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; + void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit); /** * Refresh the index. diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index f4508b0..b177b41 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -4,17 +4,12 @@ import org.elasticsearch.action.ActionRequest; import java.io.Closeable; import java.io.Flushable; -import java.io.IOException; import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { void setEnabled(boolean enabled); - void startBulkMode(IndexDefinition indexDefinition) throws IOException; - - void stopBulkMode(IndexDefinition indexDefinition) throws IOException; - void add(ActionRequest request); boolean waitForBulkResponses(long timeout, TimeUnit unit); diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 00946b8..c6f669c 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -1,74 +1,75 @@ package org.xbib.elx.api; import java.time.format.DateTimeFormatter; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; public interface IndexDefinition { - IndexDefinition setIndex(String index); + void setIndex(String index); String getIndex(); - IndexDefinition setType(String type); + void setType(String type); String getType(); - IndexDefinition setFullIndexName(String fullIndexName); + void setFullIndexName(String fullIndexName); String getFullIndexName(); - IndexDefinition setSettings(String settings); + void setSettings(String settings); String getSettings(); - IndexDefinition setMappings(String mappings); + void setMappings(String mappings); String getMappings(); - IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter); + void setDateTimeFormatter(DateTimeFormatter formatter); DateTimeFormatter getDateTimeFormatter(); - IndexDefinition setDateTimePattern(Pattern pattern); + void setDateTimePattern(Pattern pattern); Pattern getDateTimePattern(); - IndexDefinition setStartBulkRefreshSeconds(int seconds); + void setStartBulkRefreshSeconds(int seconds); int getStartBulkRefreshSeconds(); - IndexDefinition setStopBulkRefreshSeconds(int seconds); + void setStopBulkRefreshSeconds(int seconds); int getStopBulkRefreshSeconds(); - IndexDefinition setEnabled(boolean enabled); + void setEnabled(boolean enabled); boolean isEnabled(); - IndexDefinition setShift(boolean shift); + void setShift(boolean shift); boolean isShiftEnabled(); - IndexDefinition setPrune(boolean prune); + void setPrune(boolean prune); boolean isPruneEnabled(); - IndexDefinition setForceMerge(boolean forcemerge); + void setForceMerge(boolean forcemerge); boolean isForceMergeEnabled(); - IndexDefinition setReplicaLevel(int replicaLevel); + void setShardCount(int shardCount); - int getReplicaLevel(); + int getShardCount(); - IndexDefinition setRetention(IndexRetention indexRetention); + void setReplicaCount(int replicaLevel); - IndexRetention getRetention(); + int getReplicaCount(); - IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit); + void setDelta(int delta); - long getMaxWaitTime(); + int getDelta(); - TimeUnit getMaxWaitTimeUnit(); + void setMinToKeep(int minToKeep); + + int getMinToKeep(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java b/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java deleted file mode 100644 index 44116e2..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.xbib.elx.api; - -public interface IndexRetention { - - IndexRetention setDelta(int delta); - - int getDelta(); - - IndexRetention setMinToKeep(int minToKeep); - - int getMinToKeep(); - -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 8ad0db5..05711e6 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -81,7 +81,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); @Override - public Map getMapping(IndexDefinition indexDefinition) { + public Map getMapping(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return null; } @@ -102,33 +102,34 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient deleteIndex(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { - return null; + return this; } - ensureClientIsPresent(); String index = indexDefinition.getFullIndexName(); if (index == null) { logger.warn("no index name given to delete index"); return this; } + ensureClientIsPresent(); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); return this; } @Override - public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) { + public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return null; } - if (level < 1) { + if (indexDefinition.getReplicaCount() < 1) { logger.warn("invalid replica level"); return this; } - String index = indexDefinition.getFullIndexName(); - long maxWaitTime = indexDefinition.getMaxWaitTime(); - TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit(); - updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); + logger.info("update replica level for " + + indexDefinition + " to " + indexDefinition.getReplicaCount()); + updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", indexDefinition.getReplicaCount(), + 30L, TimeUnit.SECONDS); + waitForHealthyCluster(); return this; } @@ -294,13 +295,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() && - indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null ? pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), indexDefinition.getDateTimePattern(), - indexDefinition.getRetention().getDelta(), - indexDefinition.getRetention().getMinToKeep()) : new EmptyPruneResult(); + indexDefinition.getDelta(), + indexDefinition.getMinToKeep()) : new EmptyPruneResult(); } private IndexPruneResult pruneIndex(String index, @@ -409,7 +409,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (forceMergeResponse.getFailedShards() > 0) { throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); } - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); return true; } @@ -421,7 +421,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) .settings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index b1429f5..1142f6c 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -76,7 +76,7 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { if (closed.compareAndSet(false, true)) { logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); this.settings = settings; @@ -105,13 +105,13 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { + public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) { ensureClientIsPresent(); if (key == null) { - throw new IOException("no key given"); + throw new IllegalArgumentException("no key given"); } if (value == null) { - throw new IOException("no value given"); + throw new IllegalArgumentException("no value given"); } Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); @@ -138,8 +138,15 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { + public void waitForHealthyCluster() { ensureClientIsPresent(); + String statusString = settings.get(Parameters.CLUSTER_TARGET_HEALTH.getName(), + Parameters.CLUSTER_TARGET_HEALTH.getString()); + String waitTimeStr = settings.get(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), + Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getString()); + TimeValue timeValue = TimeValue.parseTimeValue(waitTimeStr, TimeValue.timeValueMinutes(30L), ""); + long maxWaitTime = timeValue.minutes(); + TimeUnit timeUnit = TimeUnit.MINUTES; logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); @@ -155,23 +162,6 @@ public abstract class AbstractBasicClient implements BasicClient { } } - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - ensureClientIsPresent(); - logger.info("waiting for cluster shard settling"); - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() - .waitForRelocatingShards(0) - .timeout(timeout); - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse.isTimedOut()) { - String message = "timeout waiting for cluster shards: " + timeout; - logger.error(message); - throw new IllegalStateException(message); - } - } - @Override public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); @@ -228,20 +218,20 @@ public abstract class AbstractBasicClient implements BasicClient { } } - protected abstract ElasticsearchClient createClient(Settings settings) throws IOException; + protected abstract ElasticsearchClient createClient(Settings settings); protected abstract void closeClient(Settings settings) throws IOException; - protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { + protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) { ensureClientIsPresent(); if (index == null) { - throw new IOException("no index name given"); + throw new IllegalArgumentException("no index name given"); } if (key == null) { - throw new IOException("no key given"); + throw new IllegalArgumentException("no key given"); } if (value == null) { - throw new IOException("no value given"); + throw new IllegalArgumentException("no value given"); } Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 14c6ffc..3fcd849 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -1,5 +1,6 @@ package org.xbib.elx.common; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; @@ -12,6 +13,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -40,7 +42,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { if (closed.compareAndSet(true, false)) { super.init(settings); bulkProcessor = new DefaultBulkProcessor(this, settings); @@ -73,7 +75,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void newIndex(IndexDefinition indexDefinition) throws IOException { + public void newIndex(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return; } @@ -86,26 +88,36 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements throw new IllegalArgumentException("no index type given"); } ensureClientIsPresent(); - CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) + CreateIndexRequestBuilder createIndexRequestBuilder = + new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) .setIndex(index); - if (indexDefinition.getSettings() == null) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) - .endObject() - .endObject(); - indexDefinition.setSettings(builder.string()); - } - Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings()).build(); - createIndexRequestBuilder.setSettings(settings); - if (indexDefinition.getMappings() != null) { - Map mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); - createIndexRequestBuilder.addMapping(type, mappings); + if (indexDefinition.getSettings() != null) { + indexDefinition.setSettings(Strings.toString(Settings.builder() + .loadFromSource(indexDefinition.getSettings()) + .put("index.number_of_shards", indexDefinition.getShardCount()) + .put("index.number_of_replicas", 0) // always 0 + .build())); } else { - XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject(); - createIndexRequestBuilder.addMapping(type, builder); + indexDefinition.setSettings(Strings.toString(Settings.builder() + .put("index.number_of_shards", indexDefinition.getShardCount()) + .put("index.number_of_replicas", 0) // always 0 + .build())); + } + createIndexRequestBuilder.setSettings(indexDefinition.getSettings()); + if (indexDefinition.getMappings() != null) { + try { + Map mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); + createIndexRequestBuilder.addMapping(type, mappings); + } catch (IOException e) { + logger.log(Level.WARN, e.getMessage(), e); + } + } else { + try { + XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject(); + createIndexRequestBuilder.addMapping(type, builder); + } catch (IOException e) { + logger.log(Level.WARN, e.getMessage(), e); + } } CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); if (createIndexResponse.isAcknowledged()) { @@ -114,11 +126,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements logger.warn("index creation of {} not acknowledged", index); return; } - waitForCluster("GREEN", 30L, TimeUnit.MINUTES); + waitForHealthyCluster(); } @Override - public void startBulk(IndexDefinition indexDefinition) throws IOException { + public void startBulk(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return; } @@ -133,18 +145,41 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements bulkQueueSize = 256L; } putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); - bulkProcessor.startBulkMode(indexDefinition); + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStartBulkRefreshSeconds(); + if (interval != 0) { + logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); + updateIndexSetting(indexName, "refresh_interval", + interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + } else { + logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); + } } } @Override - public void stopBulk(IndexDefinition indexDefinition) throws IOException { + public void stopBulk(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return; } if (bulkProcessor != null) { ensureClientIsPresent(); - bulkProcessor.stopBulkMode(indexDefinition); + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStopBulkRefreshSeconds(); + try { + bulkProcessor.flush(); + } catch (IOException e) { + // can never happen + } + if (bulkProcessor.waitForBulkResponses(60L, TimeUnit.SECONDS)) { + if (interval != 0) { + logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); + updateIndexSetting(indexName, "refresh_interval", + interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + } else { + logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); + } + } } } @@ -226,7 +261,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { + public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) { super.updateIndexSetting(index, key, value, timeout, timeUnit); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 86ede0f..3d118c9 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -49,7 +49,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { if (closed.compareAndSet(true, false)) { super.init(settings); this.searchMetric = new DefaultSearchMetric(getScheduler(), settings); @@ -137,7 +137,6 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); - searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); ActionFuture actionFuture = searchRequestBuilder.execute(); searchMetric.getCurrentQueries().inc(); SearchResponse initialSearchResponse = actionFuture.actionGet(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 2b4dbbf..4cb2db0 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; -import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.util.concurrent.ScheduledFuture; @@ -34,8 +33,6 @@ public class DefaultBulkProcessor implements BulkProcessor { private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class); - private final BulkClient bulkClient; - private final AtomicBoolean enabled; private final ElasticsearchClient client; @@ -59,7 +56,6 @@ public class DefaultBulkProcessor implements BulkProcessor { private final int permits; public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { - this.bulkClient = bulkClient; int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), @@ -96,35 +92,6 @@ public class DefaultBulkProcessor implements BulkProcessor { this.enabled.set(enabled); } - @Override - public void startBulkMode(IndexDefinition indexDefinition) throws IOException { - String indexName = indexDefinition.getFullIndexName(); - int interval = indexDefinition.getStartBulkRefreshSeconds(); - if (interval != 0) { - logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", - interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); - } else { - logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); - } - } - - @Override - public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { - String indexName = indexDefinition.getFullIndexName(); - int interval = indexDefinition.getStopBulkRefreshSeconds(); - flush(); - if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { - if (interval != 0) { - logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", - interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); - } else { - logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); - } - } - } - @Override public void setMaxBulkActions(int bulkActions) { this.bulkActions = bulkActions; @@ -205,7 +172,7 @@ public class DefaultBulkProcessor implements BulkProcessor { } drainSemaphore(0L, TimeUnit.NANOSECONDS); bulkListener.close(); - } catch (InterruptedException exc) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index 8b3ce52..8691bb9 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -1,14 +1,12 @@ package org.xbib.elx.common; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.yaml.YamlXContent; import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.IndexDefinition; -import org.xbib.elx.api.IndexRetention; import java.io.IOException; import java.io.InputStream; @@ -20,7 +18,6 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; public class DefaultIndexDefinition implements IndexDefinition { @@ -47,42 +44,43 @@ public class DefaultIndexDefinition implements IndexDefinition { private boolean forcemerge; - private int replicaLevel; + private int shardCount; - private IndexRetention indexRetention; - - private long maxWaitTime; - - private TimeUnit maxWaitTimeUnit; + private int replicaCount; private int startRefreshInterval; private int stopRefreshInterval; + private int delta; + + private int minToKeep; + public DefaultIndexDefinition(String index, String type) { setIndex(index); setType(type); setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); - setMaxWaitTime(30, TimeUnit.SECONDS); + setShardCount(1); setShift(false); setPrune(false); + setForceMerge(false); setEnabled(true); } public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) throws IOException { - String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(), - Parameters.BULK_MAX_WAIT_RESPONSE.getString()); - TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), ""); - setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS); String indexName = settings.get("name", index); String indexType = settings.get("type", type); - boolean enabled = settings.getAsBoolean("enabled", true); setIndex(indexName); setType(indexType); + boolean enabled = settings.getAsBoolean("enabled", true); setEnabled(enabled); + boolean forcemerge = settings.getAsBoolean("forcemerge", true); + setForceMerge(forcemerge); + setShardCount(settings.getAsInt("shards", 1)); + setReplicaCount(settings.getAsInt("replicas", 1)); String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); setFullIndexName(fullIndexName); setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(), @@ -92,7 +90,7 @@ public class DefaultIndexDefinition implements IndexDefinition { if (settings.get("settings") != null && settings.get("mapping") != null) { setSettings(findSettingsFrom(settings.get("settings"))); setMappings(findMappingsFrom(settings.get("mapping"))); - setReplicaLevel(settings.getAsInt("replica", 0)); + setReplicaCount(settings.getAsInt("replica", 0)); boolean shift = settings.getAsBoolean("shift", false); setShift(shift); if (shift) { @@ -110,19 +108,16 @@ public class DefaultIndexDefinition implements IndexDefinition { boolean prune = settings.getAsBoolean("prune", false); setPrune(prune); if (prune) { - IndexRetention indexRetention = new DefaultIndexRetention() - .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) - .setDelta(settings.getAsInt("retention.delta", 0)); - setRetention(indexRetention); + setMinToKeep(settings.getAsInt("retention.mintokeep", 2)); + setDelta(settings.getAsInt("retention.delta", 2)); } } } } @Override - public IndexDefinition setIndex(String index) { + public void setIndex(String index) { this.index = index; - return this; } @Override @@ -131,9 +126,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setType(String type) { + public void setType(String type) { this.type = type; - return this; } @Override @@ -143,9 +137,8 @@ public class DefaultIndexDefinition implements IndexDefinition { @Override - public IndexDefinition setFullIndexName(String fullIndexName) { + public void setFullIndexName(String fullIndexName) { this.fullIndexName = fullIndexName; - return this; } @Override @@ -154,9 +147,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setSettings(String settings) { + public void setSettings(String settings) { this.settings = settings; - return this; } @Override @@ -165,9 +157,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setMappings(String mappings) { + public void setMappings(String mappings) { this.mappings = mappings; - return this; } @Override @@ -176,9 +167,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter) { + public void setDateTimeFormatter(DateTimeFormatter formatter) { this.formatter = formatter; - return this; } @Override @@ -187,9 +177,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setDateTimePattern(Pattern pattern) { + public void setDateTimePattern(Pattern pattern) { this.pattern = pattern; - return this; } @Override @@ -198,9 +187,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setStartBulkRefreshSeconds(int seconds) { + public void setStartBulkRefreshSeconds(int seconds) { this.startRefreshInterval = seconds; - return this; } @Override @@ -209,9 +197,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setStopBulkRefreshSeconds(int seconds) { + public void setStopBulkRefreshSeconds(int seconds) { this.stopRefreshInterval = seconds; - return this; } @Override @@ -220,9 +207,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setEnabled(boolean enabled) { + public void setEnabled(boolean enabled) { this.enabled = enabled; - return this; } @Override @@ -231,9 +217,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setShift(boolean shift) { + public void setShift(boolean shift) { this.shift = shift; - return this; } @Override @@ -242,9 +227,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setPrune(boolean prune) { + public void setPrune(boolean prune) { this.prune = prune; - return this; } @Override @@ -253,9 +237,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setForceMerge(boolean forcemerge) { + public void setForceMerge(boolean forcemerge) { this.forcemerge = forcemerge; - return this; } @Override @@ -264,44 +247,44 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setReplicaLevel(int replicaLevel) { - this.replicaLevel = replicaLevel; - return this; + public void setShardCount(int shardCount) { + this.shardCount = shardCount; } @Override - public int getReplicaLevel() { - return replicaLevel; + public int getShardCount() { + return shardCount; } @Override - public IndexDefinition setRetention(IndexRetention indexRetention) { - this.indexRetention = indexRetention; - return this; + public void setReplicaCount(int replicaCount) { + this.replicaCount = replicaCount; } @Override - public IndexRetention getRetention() { - return indexRetention; + public int getReplicaCount() { + return replicaCount; } @Override - public IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit) { - this.maxWaitTime = maxWaitTime; - this.maxWaitTimeUnit = timeUnit; - return this; + public void setDelta(int delta) { + this.delta = delta; } @Override - public long getMaxWaitTime() { - return maxWaitTime; + public int getDelta() { + return delta; } @Override - public TimeUnit getMaxWaitTimeUnit() { - return maxWaitTimeUnit; + public void setMinToKeep(int minToKeep) { + this.minToKeep = minToKeep; } + @Override + public int getMinToKeep() { + return minToKeep; + } private static String findSettingsFrom(String string) throws IOException { if (string == null) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java deleted file mode 100644 index 71a7421..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.xbib.elx.common; - -import org.xbib.elx.api.IndexRetention; - -public class DefaultIndexRetention implements IndexRetention { - - private int delta; - - private int minToKeep; - - public DefaultIndexRetention() { - this.delta = 2; - this.minToKeep = 2; - } - - @Override - public IndexRetention setDelta(int delta) { - this.delta = delta; - return this; - } - - @Override - public int getDelta() { - return delta; - } - - @Override - public IndexRetention setMinToKeep(int minToKeep) { - this.minToKeep = minToKeep; - return this; - } - - @Override - public int getMinToKeep() { - return minToKeep; - } -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index cde8c5f..48b4d99 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -125,7 +125,11 @@ public class DefaultSearchMetric implements SearchMetric { private void log() { if (logger.isInfoEnabled()) { - logger.info("docs = " + getTotalQueries().getCount()); + logger.info("queries = " + getTotalQueries().getCount() + + " succeeded = " + getSucceededQueries().getCount() + + " empty = " + getEmptyQueries().getCount() + + " failed = " + getFailedQueries() + + " timeouts = " + getTimeoutQueries().getCount()); } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index 6e25fef..66619fd 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -3,8 +3,6 @@ package org.xbib.elx.common; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import java.util.concurrent.TimeUnit; - /** * A mocked client, it does not perform any actions on a cluster. Useful for testing. */ @@ -28,14 +26,6 @@ public class MockAdminClient extends AbstractAdminClient { protected void closeClient(Settings settings) { } - @Override - public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) { - } - - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - } - @Override public void close() { // nothing to do diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index e0e861b..0856fe5 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -26,10 +26,6 @@ public class MockBulkClient extends AbstractBulkClient { return null; } - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - } - @Override protected ElasticsearchClient createClient(Settings settings) { return null; diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java index 9d182dc..82be6ef 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java @@ -3,8 +3,6 @@ package org.xbib.elx.common; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import java.util.concurrent.TimeUnit; - /** * A mocked client, it does not perform any actions on a cluster. Useful for testing. */ @@ -24,10 +22,6 @@ public class MockSearchClient extends AbstractSearchClient { return null; } - @Override - public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - } - @Override protected ElasticsearchClient createClient(Settings settings) { return null; diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 1dd55bc..30a3d9e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,6 +2,10 @@ package org.xbib.elx.common; public enum Parameters { + CLUSTER_TARGET_HEALTH("cluster.target_health", String.class, "GREEN"), + + CLUSTER_TARGET_HEALTH_TIMEOUT("cluster.target_health_timeout", String.class, "30m"), + DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java index 6444819..0aaefbf 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java @@ -3,8 +3,6 @@ package org.xbib.elx.common.test; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; @@ -62,7 +60,6 @@ class AliasTest { long t1 = (System.nanoTime() - t0) / 1000000; logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1); assertTrue(t1 >= 0); - client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest()); } @Test @@ -90,6 +87,7 @@ class AliasTest { getAliasesRequest.aliases(alias); GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); + // reverse order Set result = new TreeSet<>(Collections.reverseOrder()); for (ObjectCursor indexName : getAliasesResponse.getAliases().keys()) { Matcher m = pattern.matcher(indexName.value); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 6de3be9..16b3893 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -35,7 +35,7 @@ class BulkClientTest { void testNewIndex() throws Exception { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -46,7 +46,7 @@ class BulkClientTest { void testSingleDoc() throws Exception { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -65,7 +65,7 @@ class BulkClientTest { long numactions = ACTIONS; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -93,7 +93,7 @@ class BulkClientTest { long timeout = 120L; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index b9f3e02..d5241aa 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -37,7 +37,7 @@ class DuplicateIDTest { long numactions = ACTIONS; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 5560139..0610c79 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; -import org.xbib.elx.api.IndexRetention; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.DefaultIndexRetention; import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; @@ -40,11 +38,11 @@ class IndexPruneTest { void testPrune() throws IOException { try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build(); NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test_prune"); @@ -64,10 +62,8 @@ class IndexPruneTest { bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); - IndexRetention indexRetention = new DefaultIndexRetention(); - indexRetention.setDelta(2); - indexRetention.setMinToKeep(2); - indexDefinition.setRetention(indexRetention); + indexDefinition.setDelta(2); + indexDefinition.setMinToKeep(2); indexDefinition.setEnabled(true); indexDefinition.setPrune(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 49545df..5538f2e 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -38,11 +38,11 @@ class IndexShiftTest { void testIndexShift() throws Exception { try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build(); NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 6fa4b2b..0e2abf5 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -40,7 +40,7 @@ class SearchTest { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -60,7 +60,7 @@ class SearchTest { } try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client()) .setSearchClientProvider(NodeSearchClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { // test stream count Stream stream = searchClient.search(qb -> qb diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 1a40185..e6e5d7d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -36,18 +36,17 @@ class SmokeTest { void smokeTest() throws Exception { try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build(); NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getClientSettings()) .build()) { assertEquals(helper.getClusterName(), adminClient.getClusterName()); IndexDefinition indexDefinition = new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); assertEquals("test_smoke", indexDefinition.getIndex()); assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); - assertEquals(0, indexDefinition.getReplicaLevel()); indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -64,9 +63,8 @@ class SmokeTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.updateReplicaLevel(indexDefinition, 2); - int replica = adminClient.getReplicaLevel(indexDefinition); - assertEquals(2, replica); + adminClient.updateReplicaLevel(indexDefinition); + assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().getLastBulkError() != null) { diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index 16fa2e0..5f89943 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -2,21 +2,12 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -67,8 +58,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Objects.requireNonNull(helper); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode(); - helper.greenHealth(); - logger.info("cluser name = {}", helper.clusterName()); } @Override @@ -137,11 +126,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return cluster; } - Settings getNodeSettings() { + Settings getClientSettings() { return Settings.builder() - .put("name", "elx-client") // for threadpool name + .put("name", getClusterName() + "-client-name") // for threadpool name .put("cluster.name", getClusterName()) .put("path.home", getHome()) + .put("node.name", getClusterName() + "-node") + .put("node.client", true) + .put("node.master", false) + .put("node.data", false) .build(); } @@ -170,8 +163,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Node buildNode() { String id = "1"; Settings nodeSettings = Settings.builder() - .put(getNodeSettings()) - .put("node.name", id) + .put("name", getClusterName() + "-server-name") // for threadpool name + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .put("node.name", getClusterName() + "-node") + .put("node.client", false) + .put("node.master", true) + .put("node.data", true) .build(); this.node = new MockNode(nodeSettings); return node; @@ -184,31 +182,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft void closeNodes() { if (node != null) { logger.info("closing all nodes"); + node.client().close(); node.close(); } } - void greenHealth() throws IOException { - try { - ClusterHealthResponse healthResponse = client().execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) - .timeout(TimeValue.timeValueSeconds(30))).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + ", from here on, everything will fail!"); - } - } catch (ElasticsearchTimeoutException e) { - throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); - } - } - - String clusterName() { - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); - ClusterStateResponse clusterStateResponse = - client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); - return clusterStateResponse.getClusterName().value(); - } - private static final Random random = new Random(); private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index 08b3cbc..ff5a88b 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -5,8 +5,6 @@ import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractAdminClient; -import java.io.IOException; - /** * Transport admin client. */ @@ -20,12 +18,12 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings); } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { super.init(settings); helper.init((TransportClient) getClient(), settings); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index b591fc2..c9a3f36 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -19,12 +19,12 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings); } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { super.init(settings); helper.init((TransportClient) getClient(), settings); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 08ee421..9ce152d 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -46,7 +46,7 @@ public class TransportClientHelper { } } - public void init(TransportClient transportClient, Settings settings) throws IOException { + public void init(TransportClient transportClient, Settings settings) { Collection addrs = findAddresses(settings); if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) { throw new NoNodeAvailableException("no cluster nodes available, check settings = " @@ -54,7 +54,7 @@ public class TransportClientHelper { } } - private Collection findAddresses(Settings settings) throws IOException { + private Collection findAddresses(Settings settings) { final int defaultPort = settings.getAsInt("port", 9300); Collection addresses = new ArrayList<>(); for (String hostname : settings.getAsArray("host")) { @@ -66,16 +66,20 @@ public class TransportClientHelper { int port = Integer.parseInt(splitHost[1]); TransportAddress address = new InetSocketTransportAddress(inetAddress, port); addresses.add(address); - } catch (NumberFormatException e) { + } catch (IOException e) { logger.warn(e.getMessage(), e); } } else if (splitHost.length == 1) { - String host = splitHost[0]; - InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); - TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); - addresses.add(address); + try { + String host = splitHost[0]; + InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); + TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); + addresses.add(address); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } } else { - throw new IOException("invalid hostname specification: " + hostname); + throw new IllegalArgumentException("invalid hostname specification: " + hostname); } } return addresses; diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index 53bc8a7..eb39534 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -4,7 +4,6 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.common.AbstractSearchClient; -import java.io.IOException; /** * Transport search client with additional methods. @@ -19,12 +18,12 @@ public class TransportSearchClient extends AbstractSearchClient { } @Override - public ElasticsearchClient createClient(Settings settings) throws IOException { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings); } @Override - public void init(Settings settings) throws IOException { + public void init(Settings settings) { super.init(settings); helper.init((TransportClient) getClient(), settings); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 76fd9a3..c122e0d 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -35,7 +35,7 @@ class BulkClientTest { void testNewIndex() throws Exception { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -46,7 +46,7 @@ class BulkClientTest { void testSingleDoc() throws Exception { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -65,7 +65,7 @@ class BulkClientTest { long numactions = ACTIONS; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -91,7 +91,7 @@ class BulkClientTest { final long timeout = 120L; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test"); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 437850c..4643a9a 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -32,7 +32,7 @@ class DuplicateIDTest { long numactions = ACTIONS; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index ea509af..4423f8c 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; -import org.xbib.elx.api.IndexRetention; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.DefaultIndexRetention; import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; @@ -40,11 +38,11 @@ class IndexPruneTest { void testPrune() throws IOException { try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test_prune"); @@ -64,9 +62,9 @@ class IndexPruneTest { bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); - IndexRetention indexRetention = new DefaultIndexRetention(); - indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); + indexDefinition.setDelta(2); + indexDefinition.setMinToKeep(2); indexDefinition.setPrune(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 8f1f4c1..e4b39e9 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -40,11 +40,11 @@ class IndexShiftTest { void testIndexShift() throws Exception { try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 3d254f6..ea60953 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -42,7 +42,7 @@ class SearchTest { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -62,7 +62,7 @@ class SearchTest { } try (TransportSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(TransportSearchClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { // test stream count Stream stream = searchClient.search(qb -> qb diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index bb917be..fb7a9f5 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -36,17 +36,16 @@ class SmokeTest { void smokeTest() throws Exception { try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build(); TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) + .put(helper.getClientSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); assertEquals("test", indexDefinition.getIndex()); assertTrue(indexDefinition.getFullIndexName().startsWith("test")); - assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -62,9 +61,8 @@ class SmokeTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.updateReplicaLevel(indexDefinition, 2); - int replica = adminClient.getReplicaLevel(indexDefinition); - assertEquals(2, replica); + adminClient.updateReplicaLevel(indexDefinition); + assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkProcessor().getLastBulkError() != null) { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index 2bda899..c86696e 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -2,21 +2,12 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.support.AbstractClient; -import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -68,8 +59,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Objects.requireNonNull(helper); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); helper.startNode(); - helper.greenHealth(); - logger.info("cluster name = {}", helper.clusterName()); } @Override @@ -122,8 +111,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Node node; - AbstractClient client; - void setHome(String home) { this.home = home; } @@ -140,14 +127,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return cluster; } - Settings getNodeSettings() { - return Settings.builder() - .put("cluster.name", getClusterName()) - .put("path.home", getHome()) - .build(); + ElasticsearchClient client() { + return node.client(); } - Settings getTransportSettings() { + Settings getClientSettings() { return Settings.builder() .put("cluster.name", cluster) .put("path.home", getHome()) @@ -159,7 +143,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft void startNode() { buildNode().start(); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + NodesInfoResponse response = node.client().execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); Object obj = response.iterator().next().getTransport().getAddress() .publishAddress(); if (obj instanceof InetSocketTransportAddress) { @@ -180,48 +164,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } Node buildNode() { - String id = "1"; - Settings nodeSettings = Settings.builder() - .put(getNodeSettings()) - .put("node.name", id) - .build(); - node = new MockNode(nodeSettings); - client = (AbstractClient) node.client(); + node = new MockNode(Settings.builder() + .put("name", getClusterName() + "-server-name") // for threadpool name + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .put("node.name", getClusterName() + "-node") + .put("node.client", false) + .put("node.master", true) + .put("node.data", true) + .build()); return node; } void closeNodes() { - if (client != null) { - logger.info("closing client"); - client.close(); - } if (node != null) { logger.info("closing node"); + node.client().close(); node.close(); } } - void greenHealth() throws IOException { - try { - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) - .timeout(TimeValue.timeValueSeconds(30))).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - throw new IOException("cluster state is " + healthResponse.getStatus().name() - + ", from here on, everything will fail!"); - } - } catch (ElasticsearchTimeoutException e) { - throw new IOException("cluster does not respond to health request, cowardly refusing to continue"); - } - } - - String clusterName() { - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); - ClusterStateResponse clusterStateResponse = - client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); - return clusterStateResponse.getClusterName().value(); - } - private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray(); private static final Random random = new SecureRandom();