clean up API, remove IndexRetention to index definition, move start/stop bulk, default bulk wait

This commit is contained in:
Jörg Prante 2021-04-22 23:29:49 +02:00
parent 40cdde3ae8
commit 9c606eebdd
16 changed files with 206 additions and 226 deletions

View file

@ -8,7 +8,7 @@ import java.util.Map;
*/ */
public interface AdminClient extends BasicClient { public interface AdminClient extends BasicClient {
Map<String, ?> getMapping(IndexDefinition indexDefinition); Map<String, Object> getMapping(IndexDefinition indexDefinition);
void checkMapping(IndexDefinition indexDefinition); void checkMapping(IndexDefinition indexDefinition);
@ -20,12 +20,11 @@ public interface AdminClient extends BasicClient {
AdminClient deleteIndex(IndexDefinition indexDefinition); AdminClient deleteIndex(IndexDefinition indexDefinition);
/** /**
* Update replica level. * Update replica level to the one in the index definition.
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param level the replica level
* @return this * @return this
*/ */
AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level); AdminClient updateReplicaLevel(IndexDefinition indexDefinition);
/** /**
* Get replica level. * Get replica level.

View file

@ -10,10 +10,6 @@ public interface BulkProcessor extends Closeable, Flushable {
void setEnabled(boolean enabled); void setEnabled(boolean enabled);
void startBulkMode(IndexDefinition indexDefinition);
void stopBulkMode(IndexDefinition indexDefinition);
void add(DocWriteRequest<?> request); void add(DocWriteRequest<?> request);
boolean waitForBulkResponses(long timeout, TimeUnit unit); boolean waitForBulkResponses(long timeout, TimeUnit unit);
@ -29,5 +25,4 @@ public interface BulkProcessor extends Closeable, Flushable {
void setMaxBulkVolume(long bulkSize); void setMaxBulkVolume(long bulkSize);
long getMaxBulkVolume(); long getMaxBulkVolume();
} }

View file

@ -1,7 +1,8 @@
package org.xbib.elx.api; package org.xbib.elx.api;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit; import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public interface IndexDefinition { public interface IndexDefinition {
@ -12,69 +13,73 @@ public interface IndexDefinition {
*/ */
String TYPE_NAME = "_doc"; String TYPE_NAME = "_doc";
IndexDefinition setIndex(String index); void setIndex(String index);
String getIndex(); String getIndex();
IndexDefinition setType(String type); void setType(String type);
String getType(); String getType();
IndexDefinition setFullIndexName(String fullIndexName); void setFullIndexName(String fullIndexName);
String getFullIndexName(); String getFullIndexName();
IndexDefinition setSettings(String settings); void setSettings(String settings);
String getSettings(); String getSettings();
IndexDefinition setMappings(String mappings); void setMappings(String mappings);
String getMappings(); Map<String, Object> getMappings();
IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter); Set<String> getMappingFields();
void setDateTimeFormatter(DateTimeFormatter formatter);
DateTimeFormatter getDateTimeFormatter(); DateTimeFormatter getDateTimeFormatter();
IndexDefinition setDateTimePattern(Pattern pattern); void setDateTimePattern(Pattern pattern);
Pattern getDateTimePattern(); Pattern getDateTimePattern();
IndexDefinition setStartBulkRefreshSeconds(int seconds); void setStartBulkRefreshSeconds(int seconds);
int getStartBulkRefreshSeconds(); int getStartBulkRefreshSeconds();
IndexDefinition setStopBulkRefreshSeconds(int seconds); void setStopBulkRefreshSeconds(int seconds);
int getStopBulkRefreshSeconds(); int getStopBulkRefreshSeconds();
IndexDefinition setEnabled(boolean enabled); void setEnabled(boolean enabled);
boolean isEnabled(); boolean isEnabled();
IndexDefinition setShift(boolean shift); void setShift(boolean shift);
boolean isShiftEnabled(); boolean isShiftEnabled();
IndexDefinition setPrune(boolean prune); void setPrune(boolean prune);
boolean isPruneEnabled(); boolean isPruneEnabled();
IndexDefinition setForceMerge(boolean forcemerge); void setForceMerge(boolean forcemerge);
boolean isForceMergeEnabled(); boolean isForceMergeEnabled();
IndexDefinition setReplicaLevel(int replicaLevel); void setShardCount(int shardCount);
int getReplicaLevel(); int getShardCount();
IndexDefinition setRetention(IndexRetention indexRetention); void setReplicaCount(int replicaCount);
IndexRetention getRetention(); int getReplicaCount();
IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit); void setDelta(int delta);
long getMaxWaitTime(); int getDelta();
TimeUnit getMaxWaitTimeUnit(); void setMinToKeep(int minToKeep);
int getMinToKeep();
} }

View file

@ -1,13 +0,0 @@
package org.xbib.elx.api;
public interface IndexRetention {
IndexRetention setDelta(int delta);
int getDelta();
IndexRetention setMinToKeep(int minToKeep);
int getMinToKeep();
}

View file

@ -78,7 +78,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName());
@Override @Override
public Map<String, ?> getMapping(IndexDefinition indexDefinition) { public Map<String, Object> getMapping(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return null; return null;
} }
@ -110,17 +110,18 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
} }
@Override @Override
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) { public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return this; return this;
} }
if (level < 1) { if (indexDefinition.getReplicaCount() < 1) {
logger.warn("invalid replica level"); logger.warn("invalid replica level");
return this; return this;
} }
logger.info("update replica level for " + indexDefinition + " to " + level); logger.info("update replica level for " +
updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas", level, indexDefinition + " to " + indexDefinition.getReplicaCount());
30L, TimeUnit.SECONDS); updateIndexSetting(indexDefinition.getFullIndexName(), "number_of_replicas",
indexDefinition.getReplicaCount(), 30L, TimeUnit.SECONDS);
waitForHealthyCluster(); waitForHealthyCluster();
return this; return this;
} }
@ -295,13 +296,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
return indexDefinition != null&& indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() && return indexDefinition != null&& indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() &&
indexDefinition.getRetention() != null &&
indexDefinition.getDateTimePattern() != null ? indexDefinition.getDateTimePattern() != null ?
pruneIndex(indexDefinition.getIndex(), pruneIndex(indexDefinition.getIndex(),
indexDefinition.getFullIndexName(), indexDefinition.getFullIndexName(),
indexDefinition.getDateTimePattern(), indexDefinition.getDateTimePattern(),
indexDefinition.getRetention().getDelta(), indexDefinition.getDelta(),
indexDefinition.getRetention().getMinToKeep()) : new EmptyPruneResult(); indexDefinition.getMinToKeep()) : new EmptyPruneResult();
} }
private IndexPruneResult pruneIndex(String index, private IndexPruneResult pruneIndex(String index,

View file

@ -17,8 +17,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -27,7 +25,6 @@ import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -89,15 +86,16 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
throw new IllegalArgumentException("no index name given"); throw new IllegalArgumentException("no index name given");
} }
ensureClientIsPresent(); ensureClientIsPresent();
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) CreateIndexRequestBuilder createIndexRequestBuilder =
new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
.setIndex(index); .setIndex(index);
if (indexDefinition.getSettings() == null) { if (indexDefinition.getSettings() == null) {
try { try {
XContentBuilder builder = JsonXContent.contentBuilder() XContentBuilder builder = JsonXContent.contentBuilder()
.startObject() .startObject()
.startObject("index") .startObject("index")
.field("number_of_shards", 1) .field("number_of_shards", indexDefinition.getShardCount())
.field("number_of_replicas", 0) .field("number_of_replicas", 0) // always 0
.endObject() .endObject()
.endObject(); .endObject();
indexDefinition.setSettings(Strings.toString(builder)); indexDefinition.setSettings(Strings.toString(builder));
@ -105,15 +103,18 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
logger.log(Level.WARN, e.getMessage(), e); logger.log(Level.WARN, e.getMessage(), e);
} }
} }
Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build(); Settings settings = Settings.builder()
.loadFromSource(indexDefinition.getSettings(), XContentType.JSON)
.put("index.number_of_shards", indexDefinition.getShardCount())
.put("index.number_of_replicas", 0) // always 0
.build();
createIndexRequestBuilder.setSettings(settings); createIndexRequestBuilder.setSettings(settings);
try { try {
if (indexDefinition.getMappings() != null) { if (indexDefinition.getMappings() != null) {
Map<String, Object> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, createIndexRequestBuilder.addMapping(TYPE_NAME, indexDefinition.getMappings());
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered();
createIndexRequestBuilder.addMapping(TYPE_NAME, mappings);
} else { } else {
XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject(); XContentBuilder builder = JsonXContent.contentBuilder()
.startObject().startObject(TYPE_NAME).endObject().endObject();
createIndexRequestBuilder.addMapping(TYPE_NAME, builder); createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
} }
} catch (IOException e) { } catch (IOException e) {
@ -135,9 +136,15 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return; return;
} }
if (bulkProcessor != null) {
ensureClientIsPresent(); ensureClientIsPresent();
bulkProcessor.startBulkMode(indexDefinition); String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStartBulkRefreshSeconds();
if (interval != 0) {
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
updateIndexSetting(indexName, "refresh_interval",
interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
} }
} }
@ -148,7 +155,22 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
if (bulkProcessor != null) { if (bulkProcessor != null) {
ensureClientIsPresent(); ensureClientIsPresent();
bulkProcessor.stopBulkMode(indexDefinition); String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStopBulkRefreshSeconds();
try {
bulkProcessor.flush();
} catch (IOException e) {
// can never happen
}
if (bulkProcessor.waitForBulkResponses(60L, TimeUnit.SECONDS)) {
if (interval != 0) {
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
}
}
} }
} }

View file

@ -14,7 +14,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkClient;
import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.IndexDefinition;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -34,8 +33,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class); private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class);
private final BulkClient bulkClient;
private final AtomicBoolean enabled; private final AtomicBoolean enabled;
private final ElasticsearchClient client; private final ElasticsearchClient client;
@ -59,7 +56,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final int permits; private final int permits;
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
this.bulkClient = bulkClient;
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
@ -96,35 +92,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
this.enabled.set(enabled); this.enabled.set(enabled);
} }
@Override
public void startBulkMode(IndexDefinition indexDefinition) {
String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStartBulkRefreshSeconds();
if (interval != 0) {
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
bulkClient.updateIndexSetting(indexName, "refresh_interval",
interval >=0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
}
}
@Override
public void stopBulkMode(IndexDefinition indexDefinition) {
String indexName = indexDefinition.getFullIndexName();
int interval = indexDefinition.getStopBulkRefreshSeconds();
flush();
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
if (interval != 0) {
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
bulkClient.updateIndexSetting(indexName, "refresh_interval",
interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS);
} else {
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
}
}
}
@Override @Override
public void setMaxBulkActions(int bulkActions) { public void setMaxBulkActions(int bulkActions) {
this.bulkActions = bulkActions; this.bulkActions = bulkActions;

View file

@ -2,7 +2,6 @@ package org.xbib.elx.common;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -11,7 +10,6 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.yaml.YamlXContent; import org.elasticsearch.common.xcontent.yaml.YamlXContent;
import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.AdminClient;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexRetention;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -23,7 +21,7 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class DefaultIndexDefinition implements IndexDefinition { public class DefaultIndexDefinition implements IndexDefinition {
@ -50,42 +48,43 @@ public class DefaultIndexDefinition implements IndexDefinition {
private boolean forcemerge; private boolean forcemerge;
private int replicaLevel; private int shardCount;
private IndexRetention indexRetention; private int replicaCount;
private long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private int startRefreshInterval; private int startRefreshInterval;
private int stopRefreshInterval; private int stopRefreshInterval;
private int delta;
private int minToKeep;
public DefaultIndexDefinition(String index, String type) { public DefaultIndexDefinition(String index, String type) {
setIndex(index); setIndex(index);
setType(type); setType(type);
setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()));
setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$"));
setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now()));
setMaxWaitTime(30, TimeUnit.SECONDS); setShardCount(1);
setShift(false); setShift(false);
setPrune(false); setPrune(false);
setForceMerge(false);
setEnabled(true); setEnabled(true);
} }
public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings)
throws IOException { throws IOException {
String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(),
Parameters.BULK_MAX_WAIT_RESPONSE.getString());
TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), "");
setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS);
String indexName = settings.get("name", index); String indexName = settings.get("name", index);
String indexType = settings.get("type", type); String indexType = settings.get("type", type);
boolean enabled = settings.getAsBoolean("enabled", true);
setIndex(indexName); setIndex(indexName);
setType(indexType); setType(indexType);
boolean enabled = settings.getAsBoolean("enabled", true);
setEnabled(enabled); setEnabled(enabled);
boolean forcemerge = settings.getAsBoolean("forcemerge", true);
setForceMerge(forcemerge);
setShardCount(settings.getAsInt("shards", 1));
setReplicaCount(settings.getAsInt("replicas", 1));
String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName);
setFullIndexName(fullIndexName); setFullIndexName(fullIndexName);
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(), setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(),
@ -95,7 +94,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
if (settings.get("settings") != null && settings.get("mapping") != null) { if (settings.get("settings") != null && settings.get("mapping") != null) {
setSettings(findSettingsFrom(settings.get("settings"))); setSettings(findSettingsFrom(settings.get("settings")));
setMappings(findMappingsFrom(settings.get("mapping"))); setMappings(findMappingsFrom(settings.get("mapping")));
setReplicaLevel(settings.getAsInt("replica", 0)); setReplicaCount(settings.getAsInt("replica", 0));
boolean shift = settings.getAsBoolean("shift", false); boolean shift = settings.getAsBoolean("shift", false);
setShift(shift); setShift(shift);
if (shift) { if (shift) {
@ -113,19 +112,16 @@ public class DefaultIndexDefinition implements IndexDefinition {
boolean prune = settings.getAsBoolean("prune", false); boolean prune = settings.getAsBoolean("prune", false);
setPrune(prune); setPrune(prune);
if (prune) { if (prune) {
IndexRetention indexRetention = new DefaultIndexRetention() setMinToKeep(settings.getAsInt("retention.mintokeep", 2));
.setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) setDelta(settings.getAsInt("retention.delta", 2));
.setDelta(settings.getAsInt("retention.delta", 0));
setRetention(indexRetention);
} }
} }
} }
} }
@Override @Override
public IndexDefinition setIndex(String index) { public void setIndex(String index) {
this.index = index; this.index = index;
return this;
} }
@Override @Override
@ -134,9 +130,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setType(String type) { public void setType(String type) {
this.type = type; this.type = type;
return this;
} }
@Override @Override
@ -145,9 +140,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setFullIndexName(String fullIndexName) { public void setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName; this.fullIndexName = fullIndexName;
return this;
} }
@Override @Override
@ -156,9 +150,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setSettings(String settings) { public void setSettings(String settings) {
this.settings = settings; this.settings = settings;
return this;
} }
@Override @Override
@ -167,20 +160,38 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setMappings(String mappings) { public void setMappings(String mappings) {
this.mappings = mappings; this.mappings = mappings;
return this;
} }
@Override @Override
public String getMappings() { public Map<String, Object> getMappings() {
return mappings; if (mappings == null) {
return null;
}
try {
return JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings).mapOrdered();
} catch (IOException e) {
return null;
}
}
public Set<String> getMappingFields() {
if (mappings == null) {
return null;
}
try {
return Settings.fromXContent(JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings)).getGroups("properties").keySet();
} catch (IOException e) {
return null;
}
} }
@Override @Override
public IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter) { public void setDateTimeFormatter(DateTimeFormatter formatter) {
this.formatter = formatter; this.formatter = formatter;
return this;
} }
@Override @Override
@ -189,9 +200,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setDateTimePattern(Pattern pattern) { public void setDateTimePattern(Pattern pattern) {
this.pattern = pattern; this.pattern = pattern;
return this;
} }
@Override @Override
@ -200,9 +210,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setStartBulkRefreshSeconds(int seconds) { public void setStartBulkRefreshSeconds(int seconds) {
this.startRefreshInterval = seconds; this.startRefreshInterval = seconds;
return this;
} }
@Override @Override
@ -211,9 +220,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setStopBulkRefreshSeconds(int seconds) { public void setStopBulkRefreshSeconds(int seconds) {
this.stopRefreshInterval = seconds; this.stopRefreshInterval = seconds;
return this;
} }
@Override @Override
@ -222,9 +230,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setEnabled(boolean enabled) { public void setEnabled(boolean enabled) {
this.enabled = enabled; this.enabled = enabled;
return this;
} }
@Override @Override
@ -233,9 +240,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setShift(boolean shift) { public void setShift(boolean shift) {
this.shift = shift; this.shift = shift;
return this;
} }
@Override @Override
@ -244,9 +250,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setPrune(boolean prune) { public void setPrune(boolean prune) {
this.prune = prune; this.prune = prune;
return this;
} }
@Override @Override
@ -255,9 +260,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setForceMerge(boolean forcemerge) { public void setForceMerge(boolean forcemerge) {
this.forcemerge = forcemerge; this.forcemerge = forcemerge;
return this;
} }
@Override @Override
@ -266,44 +270,44 @@ public class DefaultIndexDefinition implements IndexDefinition {
} }
@Override @Override
public IndexDefinition setReplicaLevel(int replicaLevel) { public void setShardCount(int shardCount) {
this.replicaLevel = replicaLevel; this.shardCount = shardCount;
return this;
} }
@Override @Override
public int getReplicaLevel() { public int getShardCount() {
return replicaLevel; return shardCount;
} }
@Override @Override
public IndexDefinition setRetention(IndexRetention indexRetention) { public void setReplicaCount(int replicaCount) {
this.indexRetention = indexRetention; this.replicaCount = replicaCount;
return this;
} }
@Override @Override
public IndexRetention getRetention() { public int getReplicaCount() {
return indexRetention; return replicaCount;
} }
@Override @Override
public IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit) { public void setDelta(int delta) {
this.maxWaitTime = maxWaitTime; this.delta = delta;
this.maxWaitTimeUnit = timeUnit;
return this;
} }
@Override @Override
public long getMaxWaitTime() { public int getDelta() {
return maxWaitTime; return delta;
} }
@Override @Override
public TimeUnit getMaxWaitTimeUnit() { public void setMinToKeep(int minToKeep) {
return maxWaitTimeUnit; this.minToKeep = minToKeep;
} }
@Override
public int getMinToKeep() {
return minToKeep;
}
private static String findSettingsFrom(String string) throws IOException { private static String findSettingsFrom(String string) throws IOException {
if (string == null) { if (string == null) {
return null; return null;

View file

@ -1,37 +0,0 @@
package org.xbib.elx.common;
import org.xbib.elx.api.IndexRetention;
public class DefaultIndexRetention implements IndexRetention {
private int delta;
private int minToKeep;
public DefaultIndexRetention() {
this.delta = 2;
this.minToKeep = 2;
}
@Override
public IndexRetention setDelta(int delta) {
this.delta = delta;
return this;
}
@Override
public int getDelta() {
return delta;
}
@Override
public IndexRetention setMinToKeep(int minToKeep) {
this.minToKeep = minToKeep;
return this;
}
@Override
public int getMinToKeep() {
return minToKeep;
}
}

View file

@ -8,8 +8,6 @@ public enum Parameters {
DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"),
BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"),
BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1),
BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),

View file

@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexRetention;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.DefaultIndexRetention;
import org.xbib.elx.http.HttpAdminClient; import org.xbib.elx.http.HttpAdminClient;
import org.xbib.elx.http.HttpAdminClientProvider; import org.xbib.elx.http.HttpAdminClientProvider;
import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClient;
@ -64,9 +62,9 @@ class IndexPruneTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
IndexRetention indexRetention = new DefaultIndexRetention();
indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true); indexDefinition.setEnabled(true);
indexDefinition.setDelta(2);
indexDefinition.setMinToKeep(2);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult); logger.info("prune result = " + indexPruneResult);

View file

@ -45,7 +45,7 @@ class SmokeTest {
.build()) { .build()) {
IndexDefinition indexDefinition = IndexDefinition indexDefinition =
new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY);
assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(1, indexDefinition.getReplicaCount());
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
@ -61,7 +61,7 @@ class SmokeTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition, 1); adminClient.updateReplicaLevel(indexDefinition);
assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
@ -73,14 +73,30 @@ class SmokeTest {
XContentBuilder builder = JsonXContent.contentBuilder() XContentBuilder builder = JsonXContent.contentBuilder()
.startObject() .startObject()
.startObject("properties") .startObject("properties")
.startObject("name")
.field("type", "keyword")
.endObject()
.startObject("location") .startObject("location")
.field("type", "geo_point") .field("type", "geo_point")
.endObject() .endObject()
.startObject("point")
.field("type", "object")
.startObject("properties")
.startObject("x")
.field("type", "integer")
.endObject()
.startObject("y")
.field("type", "integer")
.endObject()
.endObject()
.endObject()
.endObject() .endObject()
.endObject(); .endObject();
indexDefinition.setMappings(Strings.toString(builder)); indexDefinition.setMappings(Strings.toString(builder));
assertTrue(indexDefinition.getMappings().containsKey("properties"));
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties"));
logger.info("mappings = " + indexDefinition.getMappingFields());
} }
} }
} }

View file

@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexRetention;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.DefaultIndexRetention;
import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClient;
import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeAdminClientProvider;
import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClient;
@ -64,11 +62,9 @@ class IndexPruneTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
IndexRetention indexRetention = new DefaultIndexRetention();
indexRetention.setDelta(2);
indexRetention.setMinToKeep(2);
indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true); indexDefinition.setEnabled(true);
indexDefinition.setDelta(2);
indexDefinition.setMinToKeep(2);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult); logger.info("prune result = " + indexPruneResult);

View file

@ -47,7 +47,7 @@ class SmokeTest {
new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY);
assertEquals("test_smoke", indexDefinition.getIndex()); assertEquals("test_smoke", indexDefinition.getIndex());
assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke"));
assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(1, indexDefinition.getReplicaCount());
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
indexDefinition.setType("doc"); indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -64,7 +64,7 @@ class SmokeTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition, 1); adminClient.updateReplicaLevel(indexDefinition);
assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
@ -76,14 +76,30 @@ class SmokeTest {
XContentBuilder builder = JsonXContent.contentBuilder() XContentBuilder builder = JsonXContent.contentBuilder()
.startObject() .startObject()
.startObject("properties") .startObject("properties")
.startObject("name")
.field("type", "keyword")
.endObject()
.startObject("location") .startObject("location")
.field("type", "geo_point") .field("type", "geo_point")
.endObject() .endObject()
.startObject("point")
.field("type", "object")
.startObject("properties")
.startObject("x")
.field("type", "integer")
.endObject()
.startObject("y")
.field("type", "integer")
.endObject()
.endObject()
.endObject()
.endObject() .endObject()
.endObject(); .endObject();
indexDefinition.setMappings(Strings.toString(builder)); indexDefinition.setMappings(Strings.toString(builder));
assertTrue(indexDefinition.getMappings().containsKey("properties"));
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties"));
logger.info("mappings = " + indexDefinition.getMappingFields());
} }
} }
} }

View file

@ -6,10 +6,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexRetention;
import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.common.DefaultIndexRetention;
import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClient;
import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportAdminClientProvider;
import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClient;
@ -64,9 +62,9 @@ class IndexPruneTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
indexDefinition.setShift(true); indexDefinition.setShift(true);
adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null);
indexDefinition.setDelta(2);
indexDefinition.setMinToKeep(2);
indexDefinition.setPrune(true); indexDefinition.setPrune(true);
IndexRetention indexRetention = new DefaultIndexRetention();
indexDefinition.setRetention(indexRetention);
indexDefinition.setEnabled(true); indexDefinition.setEnabled(true);
IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition);
logger.info("prune result = " + indexPruneResult); logger.info("prune result = " + indexPruneResult);

View file

@ -47,7 +47,7 @@ class SmokeTest {
new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY);
assertEquals("test", indexDefinition.getIndex()); assertEquals("test", indexDefinition.getIndex());
assertTrue(indexDefinition.getFullIndexName().startsWith("test")); assertTrue(indexDefinition.getFullIndexName().startsWith("test"));
assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(1, indexDefinition.getReplicaCount());
assertEquals(helper.getClusterName(), adminClient.getClusterName()); assertEquals(helper.getClusterName(), adminClient.getClusterName());
indexDefinition.setType("doc"); indexDefinition.setType("doc");
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
@ -64,7 +64,7 @@ class SmokeTest {
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}");
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS)); assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
adminClient.updateReplicaLevel(indexDefinition, 1); adminClient.updateReplicaLevel(indexDefinition);
assertEquals(1, adminClient.getReplicaLevel(indexDefinition)); assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
@ -76,14 +76,30 @@ class SmokeTest {
XContentBuilder builder = JsonXContent.contentBuilder() XContentBuilder builder = JsonXContent.contentBuilder()
.startObject() .startObject()
.startObject("properties") .startObject("properties")
.startObject("name")
.field("type", "keyword")
.endObject()
.startObject("location") .startObject("location")
.field("type", "geo_point") .field("type", "geo_point")
.endObject() .endObject()
.startObject("point")
.field("type", "object")
.startObject("properties")
.startObject("x")
.field("type", "integer")
.endObject()
.startObject("y")
.field("type", "integer")
.endObject()
.endObject()
.endObject()
.endObject() .endObject()
.endObject(); .endObject();
indexDefinition.setMappings(Strings.toString(builder)); indexDefinition.setMappings(Strings.toString(builder));
assertTrue(indexDefinition.getMappings().containsKey("properties"));
bulkClient.newIndex(indexDefinition); bulkClient.newIndex(indexDefinition);
assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties"));
logger.info("mappings = " + indexDefinition.getMappingFields());
} }
} }
} }