diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index 0e6dbe5..dbab173 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -8,7 +8,7 @@ import java.util.Map; */ public interface AdminClient extends BasicClient { - Map getMapping(IndexDefinition indexDefinition); + Map getMapping(IndexDefinition indexDefinition); void checkMapping(IndexDefinition indexDefinition); @@ -20,12 +20,11 @@ public interface AdminClient extends BasicClient { AdminClient deleteIndex(IndexDefinition indexDefinition); /** - * Update replica level. + * Update replica level to the one in the index definition. * @param indexDefinition the index definition - * @param level the replica level * @return this */ - AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level); + AdminClient updateReplicaLevel(IndexDefinition indexDefinition); /** * Get replica level. diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index 92786af..a1a7876 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -10,10 +10,6 @@ public interface BulkProcessor extends Closeable, Flushable { void setEnabled(boolean enabled); - void startBulkMode(IndexDefinition indexDefinition); - - void stopBulkMode(IndexDefinition indexDefinition); - void add(DocWriteRequest request); boolean waitForBulkResponses(long timeout, TimeUnit unit); @@ -29,5 +25,4 @@ public interface BulkProcessor extends Closeable, Flushable { void setMaxBulkVolume(long bulkSize); long getMaxBulkVolume(); - } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 6c738e4..fb59f8e 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -1,7 +1,8 @@ package org.xbib.elx.api; import java.time.format.DateTimeFormatter; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; public interface IndexDefinition { @@ -12,69 +13,73 @@ public interface IndexDefinition { */ String TYPE_NAME = "_doc"; - IndexDefinition setIndex(String index); + void setIndex(String index); String getIndex(); - IndexDefinition setType(String type); + void setType(String type); String getType(); - IndexDefinition setFullIndexName(String fullIndexName); + void setFullIndexName(String fullIndexName); String getFullIndexName(); - IndexDefinition setSettings(String settings); + void setSettings(String settings); String getSettings(); - IndexDefinition setMappings(String mappings); + void setMappings(String mappings); - String getMappings(); + Map getMappings(); - IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter); + Set getMappingFields(); + + void setDateTimeFormatter(DateTimeFormatter formatter); DateTimeFormatter getDateTimeFormatter(); - IndexDefinition setDateTimePattern(Pattern pattern); + void setDateTimePattern(Pattern pattern); Pattern getDateTimePattern(); - IndexDefinition setStartBulkRefreshSeconds(int seconds); + void setStartBulkRefreshSeconds(int seconds); int getStartBulkRefreshSeconds(); - IndexDefinition setStopBulkRefreshSeconds(int seconds); + void setStopBulkRefreshSeconds(int seconds); int getStopBulkRefreshSeconds(); - IndexDefinition setEnabled(boolean enabled); + void setEnabled(boolean enabled); boolean isEnabled(); - IndexDefinition setShift(boolean shift); + void setShift(boolean shift); boolean isShiftEnabled(); - IndexDefinition setPrune(boolean prune); + void setPrune(boolean prune); boolean isPruneEnabled(); - IndexDefinition setForceMerge(boolean forcemerge); + void setForceMerge(boolean forcemerge); boolean isForceMergeEnabled(); - IndexDefinition setReplicaLevel(int replicaLevel); + void setShardCount(int shardCount); - int getReplicaLevel(); + int getShardCount(); - IndexDefinition setRetention(IndexRetention indexRetention); + void setReplicaCount(int replicaCount); - IndexRetention getRetention(); + int getReplicaCount(); - IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit); + void setDelta(int delta); - long getMaxWaitTime(); + int getDelta(); - TimeUnit getMaxWaitTimeUnit(); + void setMinToKeep(int minToKeep); + + int getMinToKeep(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java b/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java deleted file mode 100644 index 44116e2..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexRetention.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.xbib.elx.api; - -public interface IndexRetention { - - IndexRetention setDelta(int delta); - - int getDelta(); - - IndexRetention setMinToKeep(int minToKeep); - - int getMinToKeep(); - -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 193727f..1e9cce3 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -78,7 +78,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); @Override - public Map getMapping(IndexDefinition indexDefinition) { + public Map getMapping(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return null; } @@ -110,17 +110,18 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @Override - public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) { + public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return this; } - if (level < 1) { + if (indexDefinition.getReplicaCount() < 1) { logger.warn("invalid replica level"); return this; } - logger.info("update replica level for " + indexDefinition + " to " + level); - updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", level, - 30L, TimeUnit.SECONDS); + logger.info("update replica level for " + + indexDefinition + " to " + indexDefinition.getReplicaCount()); + updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", + indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS); waitForHealthyCluster(); return this; } @@ -295,13 +296,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { return indexDefinition != null&& indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() && - indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null ? pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), indexDefinition.getDateTimePattern(), - indexDefinition.getRetention().getDelta(), - indexDefinition.getRetention().getMinToKeep()) : new EmptyPruneResult(); + indexDefinition.getDelta(), + indexDefinition.getMinToKeep()) : new EmptyPruneResult(); } private IndexPruneResult pruneIndex(String index, diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 4110e9f..ac2b5f7 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -17,8 +17,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.DeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -27,7 +25,6 @@ import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -89,15 +86,16 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements throw new IllegalArgumentException("no index name given"); } ensureClientIsPresent(); - CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) + CreateIndexRequestBuilder createIndexRequestBuilder = + new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) .setIndex(index); if (indexDefinition.getSettings() == null) { try { XContentBuilder builder = JsonXContent.contentBuilder() .startObject() .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) + .field("number_of_shards", indexDefinition.getShardCount()) + .field("number_of_replicas", 0) // always 0 .endObject() .endObject(); indexDefinition.setSettings(Strings.toString(builder)); @@ -105,15 +103,18 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements logger.log(Level.WARN, e.getMessage(), e); } } - Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build(); + Settings settings = Settings.builder() + .loadFromSource(indexDefinition.getSettings(), XContentType.JSON) + .put("index.number_of_shards", indexDefinition.getShardCount()) + .put("index.number_of_replicas", 0) // always 0 + .build(); createIndexRequestBuilder.setSettings(settings); try { if (indexDefinition.getMappings() != null) { - Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered(); - createIndexRequestBuilder.addMapping(TYPE_NAME, mappings); + createIndexRequestBuilder.addMapping(TYPE_NAME, indexDefinition.getMappings()); } else { - XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject(); + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject().startObject(TYPE_NAME).endObject().endObject(); createIndexRequestBuilder.addMapping(TYPE_NAME, builder); } } catch (IOException e) { @@ -135,9 +136,15 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (isIndexDefinitionDisabled(indexDefinition)) { return; } - if (bulkProcessor != null) { - ensureClientIsPresent(); - bulkProcessor.startBulkMode(indexDefinition); + ensureClientIsPresent(); + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStartBulkRefreshSeconds(); + if (interval != 0) { + logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); + updateIndexSetting(indexName, "refresh_interval", + interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + } else { + logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); } } @@ -148,7 +155,22 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } if (bulkProcessor != null) { ensureClientIsPresent(); - bulkProcessor.stopBulkMode(indexDefinition); + String indexName = indexDefinition.getFullIndexName(); + int interval = indexDefinition.getStopBulkRefreshSeconds(); + try { + bulkProcessor.flush(); + } catch (IOException e) { + // can never happen + } + if (bulkProcessor.waitForBulkResponses(60L, TimeUnit.SECONDS)) { + if (interval != 0) { + logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); + updateIndexSetting(indexName, "refresh_interval", + interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); + } else { + logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); + } + } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index d36d13f..334d26c 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; -import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.util.concurrent.ScheduledFuture; @@ -34,8 +33,6 @@ public class DefaultBulkProcessor implements BulkProcessor { private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class); - private final BulkClient bulkClient; - private final AtomicBoolean enabled; private final ElasticsearchClient client; @@ -59,7 +56,6 @@ public class DefaultBulkProcessor implements BulkProcessor { private final int permits; public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { - this.bulkClient = bulkClient; int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), @@ -96,35 +92,6 @@ public class DefaultBulkProcessor implements BulkProcessor { this.enabled.set(enabled); } - @Override - public void startBulkMode(IndexDefinition indexDefinition) { - String indexName = indexDefinition.getFullIndexName(); - int interval = indexDefinition.getStartBulkRefreshSeconds(); - if (interval != 0) { - logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", - interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); - } else { - logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); - } - } - - @Override - public void stopBulkMode(IndexDefinition indexDefinition) { - String indexName = indexDefinition.getFullIndexName(); - int interval = indexDefinition.getStopBulkRefreshSeconds(); - flush(); - if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { - if (interval != 0) { - logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", - interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); - } else { - logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); - } - } - } - @Override public void setMaxBulkActions(int bulkActions) { this.bulkActions = bulkActions; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index 17c8756..f36e171 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -2,7 +2,6 @@ package org.xbib.elx.common; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -11,7 +10,6 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.yaml.YamlXContent; import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.IndexDefinition; -import org.xbib.elx.api.IndexRetention; import java.io.IOException; import java.io.InputStream; @@ -23,7 +21,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.Set; import java.util.regex.Pattern; public class DefaultIndexDefinition implements IndexDefinition { @@ -50,42 +48,43 @@ public class DefaultIndexDefinition implements IndexDefinition { private boolean forcemerge; - private int replicaLevel; + private int shardCount; - private IndexRetention indexRetention; - - private long maxWaitTime; - - private TimeUnit maxWaitTimeUnit; + private int replicaCount; private int startRefreshInterval; private int stopRefreshInterval; + private int delta; + + private int minToKeep; + public DefaultIndexDefinition(String index, String type) { setIndex(index); setType(type); setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); - setMaxWaitTime(30, TimeUnit.SECONDS); + setShardCount(1); setShift(false); setPrune(false); + setForceMerge(false); setEnabled(true); } public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) throws IOException { - String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(), - Parameters.BULK_MAX_WAIT_RESPONSE.getString()); - TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), ""); - setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS); String indexName = settings.get("name", index); String indexType = settings.get("type", type); - boolean enabled = settings.getAsBoolean("enabled", true); setIndex(indexName); setType(indexType); + boolean enabled = settings.getAsBoolean("enabled", true); setEnabled(enabled); + boolean forcemerge = settings.getAsBoolean("forcemerge", true); + setForceMerge(forcemerge); + setShardCount(settings.getAsInt("shards", 1)); + setReplicaCount(settings.getAsInt("replicas", 1)); String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); setFullIndexName(fullIndexName); setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(), @@ -95,7 +94,7 @@ public class DefaultIndexDefinition implements IndexDefinition { if (settings.get("settings") != null && settings.get("mapping") != null) { setSettings(findSettingsFrom(settings.get("settings"))); setMappings(findMappingsFrom(settings.get("mapping"))); - setReplicaLevel(settings.getAsInt("replica", 0)); + setReplicaCount(settings.getAsInt("replica", 0)); boolean shift = settings.getAsBoolean("shift", false); setShift(shift); if (shift) { @@ -113,19 +112,16 @@ public class DefaultIndexDefinition implements IndexDefinition { boolean prune = settings.getAsBoolean("prune", false); setPrune(prune); if (prune) { - IndexRetention indexRetention = new DefaultIndexRetention() - .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) - .setDelta(settings.getAsInt("retention.delta", 0)); - setRetention(indexRetention); + setMinToKeep(settings.getAsInt("retention.mintokeep", 2)); + setDelta(settings.getAsInt("retention.delta", 2)); } } } } @Override - public IndexDefinition setIndex(String index) { + public void setIndex(String index) { this.index = index; - return this; } @Override @@ -134,9 +130,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setType(String type) { + public void setType(String type) { this.type = type; - return this; } @Override @@ -145,9 +140,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setFullIndexName(String fullIndexName) { + public void setFullIndexName(String fullIndexName) { this.fullIndexName = fullIndexName; - return this; } @Override @@ -156,9 +150,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setSettings(String settings) { + public void setSettings(String settings) { this.settings = settings; - return this; } @Override @@ -167,20 +160,38 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setMappings(String mappings) { + public void setMappings(String mappings) { this.mappings = mappings; - return this; } @Override - public String getMappings() { - return mappings; + public Map getMappings() { + if (mappings == null) { + return null; + } + try { + return JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings).mapOrdered(); + } catch (IOException e) { + return null; + } + } + + public Set getMappingFields() { + if (mappings == null) { + return null; + } + try { + return Settings.fromXContent(JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings)).getGroups("properties").keySet(); + } catch (IOException e) { + return null; + } } @Override - public IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter) { + public void setDateTimeFormatter(DateTimeFormatter formatter) { this.formatter = formatter; - return this; } @Override @@ -189,9 +200,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setDateTimePattern(Pattern pattern) { + public void setDateTimePattern(Pattern pattern) { this.pattern = pattern; - return this; } @Override @@ -200,9 +210,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setStartBulkRefreshSeconds(int seconds) { + public void setStartBulkRefreshSeconds(int seconds) { this.startRefreshInterval = seconds; - return this; } @Override @@ -211,9 +220,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setStopBulkRefreshSeconds(int seconds) { + public void setStopBulkRefreshSeconds(int seconds) { this.stopRefreshInterval = seconds; - return this; } @Override @@ -222,9 +230,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setEnabled(boolean enabled) { + public void setEnabled(boolean enabled) { this.enabled = enabled; - return this; } @Override @@ -233,9 +240,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setShift(boolean shift) { + public void setShift(boolean shift) { this.shift = shift; - return this; } @Override @@ -244,9 +250,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setPrune(boolean prune) { + public void setPrune(boolean prune) { this.prune = prune; - return this; } @Override @@ -255,9 +260,8 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setForceMerge(boolean forcemerge) { + public void setForceMerge(boolean forcemerge) { this.forcemerge = forcemerge; - return this; } @Override @@ -266,44 +270,44 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setReplicaLevel(int replicaLevel) { - this.replicaLevel = replicaLevel; - return this; + public void setShardCount(int shardCount) { + this.shardCount = shardCount; } @Override - public int getReplicaLevel() { - return replicaLevel; + public int getShardCount() { + return shardCount; } @Override - public IndexDefinition setRetention(IndexRetention indexRetention) { - this.indexRetention = indexRetention; - return this; + public void setReplicaCount(int replicaCount) { + this.replicaCount = replicaCount; } @Override - public IndexRetention getRetention() { - return indexRetention; + public int getReplicaCount() { + return replicaCount; } @Override - public IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit) { - this.maxWaitTime = maxWaitTime; - this.maxWaitTimeUnit = timeUnit; - return this; + public void setDelta(int delta) { + this.delta = delta; } @Override - public long getMaxWaitTime() { - return maxWaitTime; + public int getDelta() { + return delta; } @Override - public TimeUnit getMaxWaitTimeUnit() { - return maxWaitTimeUnit; + public void setMinToKeep(int minToKeep) { + this.minToKeep = minToKeep; } + @Override + public int getMinToKeep() { + return minToKeep; + } private static String findSettingsFrom(String string) throws IOException { if (string == null) { return null; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java deleted file mode 100644 index 71a7421..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.xbib.elx.common; - -import org.xbib.elx.api.IndexRetention; - -public class DefaultIndexRetention implements IndexRetention { - - private int delta; - - private int minToKeep; - - public DefaultIndexRetention() { - this.delta = 2; - this.minToKeep = 2; - } - - @Override - public IndexRetention setDelta(int delta) { - this.delta = delta; - return this; - } - - @Override - public int getDelta() { - return delta; - } - - @Override - public IndexRetention setMinToKeep(int minToKeep) { - this.minToKeep = minToKeep; - return this; - } - - @Override - public int getMinToKeep() { - return minToKeep; - } -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 30a3d9e..d33359f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -8,8 +8,6 @@ public enum Parameters { DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), - BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), - BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java index 3c0e98e..d9f9d7a 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java @@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; -import org.xbib.elx.api.IndexRetention; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.DefaultIndexRetention; import org.xbib.elx.http.HttpAdminClient; import org.xbib.elx.http.HttpAdminClientProvider; import org.xbib.elx.http.HttpBulkClient; @@ -64,9 +62,9 @@ class IndexPruneTest { bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); - IndexRetention indexRetention = new DefaultIndexRetention(); - indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); + indexDefinition.setDelta(2); + indexDefinition.setMinToKeep(2); indexDefinition.setPrune(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java index f2e426c..6250280 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java @@ -45,7 +45,7 @@ class SmokeTest { .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); - assertEquals(0, indexDefinition.getReplicaLevel()); + assertEquals(1, indexDefinition.getReplicaCount()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -61,7 +61,7 @@ class SmokeTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); - adminClient.updateReplicaLevel(indexDefinition, 1); + adminClient.updateReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); @@ -73,14 +73,30 @@ class SmokeTest { XContentBuilder builder = JsonXContent.contentBuilder() .startObject() .startObject("properties") + .startObject("name") + .field("type", "keyword") + .endObject() .startObject("location") .field("type", "geo_point") .endObject() + .startObject("point") + .field("type", "object") + .startObject("properties") + .startObject("x") + .field("type", "integer") + .endObject() + .startObject("y") + .field("type", "integer") + .endObject() + .endObject() + .endObject() .endObject() .endObject(); indexDefinition.setMappings(Strings.toString(builder)); + assertTrue(indexDefinition.getMappings().containsKey("properties")); bulkClient.newIndex(indexDefinition); assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); + logger.info("mappings = " + indexDefinition.getMappingFields()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index bae31c9..07ee30f 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; -import org.xbib.elx.api.IndexRetention; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.DefaultIndexRetention; import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; @@ -64,11 +62,9 @@ class IndexPruneTest { bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); - IndexRetention indexRetention = new DefaultIndexRetention(); - indexRetention.setDelta(2); - indexRetention.setMinToKeep(2); - indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); + indexDefinition.setDelta(2); + indexDefinition.setMinToKeep(2); indexDefinition.setPrune(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index a4181c2..5011907 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -47,7 +47,7 @@ class SmokeTest { new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); assertEquals("test_smoke", indexDefinition.getIndex()); assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); - assertEquals(0, indexDefinition.getReplicaLevel()); + assertEquals(1, indexDefinition.getReplicaCount()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); @@ -64,7 +64,7 @@ class SmokeTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); - adminClient.updateReplicaLevel(indexDefinition, 1); + adminClient.updateReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); @@ -76,14 +76,30 @@ class SmokeTest { XContentBuilder builder = JsonXContent.contentBuilder() .startObject() .startObject("properties") + .startObject("name") + .field("type", "keyword") + .endObject() .startObject("location") .field("type", "geo_point") .endObject() + .startObject("point") + .field("type", "object") + .startObject("properties") + .startObject("x") + .field("type", "integer") + .endObject() + .startObject("y") + .field("type", "integer") + .endObject() + .endObject() + .endObject() .endObject() .endObject(); indexDefinition.setMappings(Strings.toString(builder)); + assertTrue(indexDefinition.getMappings().containsKey("properties")); bulkClient.newIndex(indexDefinition); assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); + logger.info("mappings = " + indexDefinition.getMappingFields()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index 43f7839..05c5da9 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; -import org.xbib.elx.api.IndexRetention; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.DefaultIndexRetention; import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; @@ -64,9 +62,9 @@ class IndexPruneTest { bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); + indexDefinition.setDelta(2); + indexDefinition.setMinToKeep(2); indexDefinition.setPrune(true); - IndexRetention indexRetention = new DefaultIndexRetention(); - indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 1f6b887..1b72852 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -47,7 +47,7 @@ class SmokeTest { new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); assertEquals("test", indexDefinition.getIndex()); assertTrue(indexDefinition.getFullIndexName().startsWith("test")); - assertEquals(0, indexDefinition.getReplicaLevel()); + assertEquals(1, indexDefinition.getReplicaCount()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); @@ -64,7 +64,7 @@ class SmokeTest { bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); - adminClient.updateReplicaLevel(indexDefinition, 1); + adminClient.updateReplicaLevel(indexDefinition); assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); @@ -76,14 +76,30 @@ class SmokeTest { XContentBuilder builder = JsonXContent.contentBuilder() .startObject() .startObject("properties") + .startObject("name") + .field("type", "keyword") + .endObject() .startObject("location") .field("type", "geo_point") .endObject() + .startObject("point") + .field("type", "object") + .startObject("properties") + .startObject("x") + .field("type", "integer") + .endObject() + .startObject("y") + .field("type", "integer") + .endObject() + .endObject() + .endObject() .endObject() .endObject(); indexDefinition.setMappings(Strings.toString(builder)); + assertTrue(indexDefinition.getMappings().containsKey("properties")); bulkClient.newIndex(indexDefinition); assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); + logger.info("mappings = " + indexDefinition.getMappingFields()); } } }