diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index 5db93c4..e00e241 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -132,7 +132,8 @@ public interface AdminClient extends BasicClient { * @param indexAliasAdder method to add aliases * @return this */ - IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases, + IndexShiftResult shiftIndex(IndexDefinition indexDefinition, + List additionalAliases, IndexAliasAdder indexAliasAdder); /** @@ -142,7 +143,9 @@ public interface AdminClient extends BasicClient { * @param additionalAliases a list of names that should be set as index aliases * @return this */ - IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases); + IndexShiftResult shiftIndex(String index, + String fullIndexName, + List additionalAliases); /** * Shift from one index to another. @@ -152,28 +155,19 @@ public interface AdminClient extends BasicClient { * @param adder an adder method to create alias term queries * @return this */ - IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases, + IndexShiftResult shiftIndex(String index, + String fullIndexName, List additionalAliases, IndexAliasAdder adder); - /** - * Prune index. - * @param indexDefinition the index definition - * @return the index prune result - */ - IndexPruneResult pruneIndex(IndexDefinition indexDefinition); /** * Apply retention policy to prune indices. All indices before delta should be deleted, * but the number of mintokeep indices must be kept. * - * @param index index name - * @param fullIndexName index name with timestamp - * @param delta timestamp delta (for index timestamps) - * @param mintokeep minimum number of indices to keep - * @param perform true if pruning should be executed, false if not + * @param indexDefinition index definition * @return the index prune result */ - IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform); + IndexPruneResult pruneIndex(IndexDefinition indexDefinition); /** * Find the timestamp of the most recently indexed document in the index. diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 1c77ccb..4e24f5a 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -1,6 +1,8 @@ package org.xbib.elx.api; +import java.time.format.DateTimeFormatter; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; public interface IndexDefinition { @@ -20,9 +22,13 @@ public interface IndexDefinition { String getMappings(); - IndexDefinition setDateTimePattern(String timeWindow); + IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter); - String getDateTimePattern(); + DateTimeFormatter getDateTimeFormatter(); + + IndexDefinition setDateTimePattern(Pattern timeWindow); + + Pattern getDateTimePattern(); IndexDefinition setEnabled(boolean enabled); @@ -36,9 +42,13 @@ public interface IndexDefinition { boolean isShiftEnabled(); - IndexDefinition setForceMerge(boolean hasForceMerge); + IndexDefinition setPrune(boolean prune); - boolean hasForceMerge(); + boolean isPruneEnabled(); + + IndexDefinition setForceMerge(boolean forcemerge); + + boolean isForceMergeEnabled(); IndexDefinition setReplicaLevel(int replicaLevel); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index bad581f..f2c4b85 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -112,27 +112,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } }; - private static final IndexPruneResult EMPTY_INDEX_PRUNE_RESULT = new IndexPruneResult() { - @Override - public State getState() { - return State.NONE; - } - - @Override - public List getCandidateIndices() { - return Collections.emptyList(); - } - - @Override - public List getDeletedIndices() { - return Collections.emptyList(); - } - - @Override - public boolean isAcknowledged() { - return false; - } - }; @Override public Map getMapping(String index) throws IOException { @@ -240,7 +219,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); SortedMap map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup(); AliasOrIndex aliasOrIndex = map.get(alias); - return aliasOrIndex != null ? aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).collect(Collectors.toList()) : null; + return aliasOrIndex != null ? aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).collect(Collectors.toList()) : Collections.emptyList(); } @Override @@ -270,7 +249,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexShiftResult shiftIndex(String index, String fullIndexName, - List additionalAliases, IndexAliasAdder adder) { + List additionalAliases, + IndexAliasAdder adder) { ensureClientIsPresent(); if (index == null) { return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to @@ -282,7 +262,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements // two situations: 1. a new alias 2. there is already an old index with the alias Optional oldIndex = resolveAlias(index).stream().sorted().findFirst(); Map oldAliasMap = oldIndex.map(this::getAliases).orElse(null); - logger.debug("old index = {} old alias map = {}", oldIndex, oldAliasMap); + logger.debug("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap); final List newAliases = new ArrayList<>(); final List moveAliases = new ArrayList<>(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); @@ -347,38 +327,50 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { - return pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), - indexDefinition.getRetention().getDelta(), indexDefinition.getRetention().getMinToKeep(), true); + return indexDefinition.isPruneEnabled() ? + pruneIndex(indexDefinition.getIndex(), + indexDefinition.getFullIndexName(), + indexDefinition.getDateTimePattern(), + indexDefinition.getRetention().getDelta(), + indexDefinition.getRetention().getMinToKeep()) : new EmptyPruneResult(); } - @Override - public IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform) { + private IndexPruneResult pruneIndex(String index, + String protectedIndexName, + Pattern pattern, + int delta, + int mintokeep) { + logger.info("before pruning: index = {} full index = {} delta = {} mintokeep = {} pattern = {}", + index, protectedIndexName, delta, mintokeep, pattern); if (delta == 0 && mintokeep == 0) { - return EMPTY_INDEX_PRUNE_RESULT; + return new EmptyPruneResult(); } - if (index.equals(fullIndexName)) { - return EMPTY_INDEX_PRUNE_RESULT; + if (index.equals(protectedIndexName)) { + return new EmptyPruneResult(); } ensureClientIsPresent(); GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); - Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); - logger.info("found {} indices", getIndexResponse.getIndices().length); + logger.info("before pruning: found total of {} indices", getIndexResponse.getIndices().length); List candidateIndices = new ArrayList<>(); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); - if (m.matches() && index.equals(m.group(1)) && !s.equals(fullIndexName)) { + if (m.matches() && m.group(1).equals(index) && !s.equals(protectedIndexName)) { candidateIndices.add(s); + } else { + logger.info("not a candidate: " + s); } } if (candidateIndices.isEmpty()) { - return EMPTY_INDEX_PRUNE_RESULT; + return new EmptyPruneResult(); } if (mintokeep > 0 && candidateIndices.size() <= mintokeep) { return new NothingToDoPruneResult(candidateIndices, Collections.emptyList()); } + Collections.sort(candidateIndices); + logger.info("found {} candidates", candidateIndices); List indicesToDelete = new ArrayList<>(); - Matcher m1 = pattern.matcher(fullIndexName); + Matcher m1 = pattern.matcher(protectedIndexName); if (m1.matches()) { Integer i1 = Integer.parseInt(m1.group(2)); for (String s : candidateIndices) { @@ -395,6 +387,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (indicesToDelete.isEmpty()) { return new NothingToDoPruneResult(candidateIndices, indicesToDelete); } + logger.warn("deleting {}", indicesToDelete); String[] s = new String[indicesToDelete.size()]; DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest() .indices(indicesToDelete.toArray(s)); @@ -413,8 +406,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(index); searchRequest.source(builder); - SearchResponse searchResponse = - client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); + SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); if (searchResponse.getHits().getHits().length == 1) { SearchHit hit = searchResponse.getHits().getHits()[0]; if (hit.getFields().get(timestampfieldname) != null) { @@ -428,7 +420,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public boolean forceMerge(IndexDefinition indexDefinition) { - if (indexDefinition.hasForceMerge()) { + if (indexDefinition.isForceMergeEnabled()) { return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); } @@ -459,19 +451,13 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements throws IOException { boolean isEnabled = settings.getAsBoolean("enabled", false); String indexName = settings.get("name", index); - String fullIndexName; - String dateTimePattern = settings.get("dateTimePattern"); - if (dateTimePattern != null) { - // check if index name with current date already exists, resolve to it - String fullName = indexName + DateTimeFormatter.ofPattern(dateTimePattern) - .withZone(ZoneId.systemDefault()) // not GMT - .format(LocalDate.now()); - Optional optional = resolveAlias(fullName).stream().findFirst(); - fullIndexName = optional.orElse(fullName); - } else { - // check if index name already exists, resolve to it - fullIndexName = resolveMostRecentIndex(indexName); - } + String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); + Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); + String dateTimeFormat = settings.get("dateTimeFormat", "yyyyMMdd"); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat) + .withZone(ZoneId.systemDefault()); + String fullName = indexName + dateTimeFormatter.format(LocalDate.now()); + String fullIndexName = resolveAlias(fullName).stream().findFirst().orElse(fullName); IndexRetention indexRetention = new DefaultIndexRetention() .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) .setDelta(settings.getAsInt("retention.delta", 0)); @@ -481,9 +467,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements .setFullIndexName(fullIndexName) .setSettings(findSettingsFrom(settings.get("settings"))) .setMappings(findMappingsFrom(settings.get("mapping"))) + .setDateTimeFormatter(dateTimeFormatter) .setDateTimePattern(dateTimePattern) .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) .setShift(settings.getAsBoolean("shift", true)) + .setPrune(settings.getAsBoolean("prune", true)) .setReplicaLevel(settings.getAsInt("replica", 0)) .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) .setRetention(indexRetention) @@ -717,6 +705,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements public boolean isAcknowledged() { return response.isAcknowledged(); } + + @Override + public String toString() { + return "PRUNED: " + indicesToDelete; + } } private static class NothingToDoPruneResult implements IndexPruneResult { @@ -749,5 +742,39 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements public boolean isAcknowledged() { return false; } + + @Override + public String toString() { + return "NOTHING TO DO"; + } } + + private static class EmptyPruneResult implements IndexPruneResult { + + @Override + public State getState() { + return State.NONE; + } + + @Override + public List getCandidateIndices() { + return Collections.emptyList(); + } + + @Override + public List getDeletedIndices() { + return Collections.emptyList(); + } + + @Override + public boolean isAcknowledged() { + return false; + } + + @Override + public String toString() { + return "EMPTY PRUNE"; + } + } + } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index 8d2d3fd..0ef0556 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -3,7 +3,9 @@ package org.xbib.elx.common; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexRetention; +import java.time.format.DateTimeFormatter; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; public class DefaultIndexDefinition implements IndexDefinition { @@ -11,7 +13,9 @@ public class DefaultIndexDefinition implements IndexDefinition { private String fullIndexName; - private String dateTimePattern; + private DateTimeFormatter formatter; + + private Pattern pattern; private String settings; @@ -21,9 +25,11 @@ public class DefaultIndexDefinition implements IndexDefinition { private boolean ignoreErrors; - private boolean switchAliases; + private boolean shift; - private boolean hasForceMerge; + private boolean prune; + + private boolean forcemerge; private int replicaLevel; @@ -82,14 +88,25 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setDateTimePattern(String timeWindow) { - this.dateTimePattern = timeWindow; + public IndexDefinition setDateTimeFormatter(DateTimeFormatter formatter) { + this.formatter = formatter; return this; } @Override - public String getDateTimePattern() { - return dateTimePattern; + public DateTimeFormatter getDateTimeFormatter() { + return formatter; + } + + @Override + public IndexDefinition setDateTimePattern(Pattern pattern) { + this.pattern = pattern; + return this; + } + + @Override + public Pattern getDateTimePattern() { + return pattern; } @Override @@ -115,25 +132,36 @@ public class DefaultIndexDefinition implements IndexDefinition { } @Override - public IndexDefinition setShift(boolean switchAliases) { - this.switchAliases = switchAliases; + public IndexDefinition setShift(boolean shift) { + this.shift = shift; return this; } @Override public boolean isShiftEnabled() { - return switchAliases; + return shift; } @Override - public IndexDefinition setForceMerge(boolean hasForceMerge) { - this.hasForceMerge = hasForceMerge; + public IndexDefinition setPrune(boolean prune) { + this.prune = prune; return this; } @Override - public boolean hasForceMerge() { - return hasForceMerge; + public boolean isPruneEnabled() { + return prune; + } + + @Override + public IndexDefinition setForceMerge(boolean forcemerge) { + this.forcemerge = forcemerge; + return this; + } + + @Override + public boolean isForceMergeEnabled() { + return forcemerge; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java index 4e49be3..71a7421 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexRetention.java @@ -8,6 +8,11 @@ public class DefaultIndexRetention implements IndexRetention { private int minToKeep; + public DefaultIndexRetention() { + this.delta = 2; + this.minToKeep = 2; + } + @Override public IndexRetention setDelta(int delta) { this.delta = delta; diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index f6e1c2f..0f07a7b 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -5,17 +5,25 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; +import org.xbib.elx.api.IndexRetention; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; +import org.xbib.elx.common.DefaultIndexRetention; import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; import java.io.IOException; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.regex.Pattern; + import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -53,8 +61,19 @@ class IndexPruneTest { adminClient.shiftIndex("test_prune", "test_prune3", Collections.emptyList()); bulkClient.newIndex("test_prune4", settings); adminClient.shiftIndex("test_prune", "test_prune4", Collections.emptyList()); - IndexPruneResult indexPruneResult = - adminClient.pruneIndex("test_prune", "test_prune4", 2, 2, true); + IndexRetention indexRetention = new DefaultIndexRetention(); + indexRetention.setDelta(2); + indexRetention.setMinToKeep(2); + IndexDefinition indexDefinition = new DefaultIndexDefinition(); + indexDefinition.setIndex("test_prune"); + indexDefinition.setFullIndexName("test_prune4"); + indexDefinition.setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); + indexDefinition.setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); + indexDefinition.setRetention(indexRetention); + indexDefinition.setEnabled(true); + indexDefinition.setPrune(true); + IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); + logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3")); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index f451572..7383a80 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -5,18 +5,25 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; +import org.xbib.elx.api.IndexRetention; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; +import org.xbib.elx.common.DefaultIndexRetention; import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; import java.io.IOException; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -55,8 +62,19 @@ class IndexPruneTest { adminClient.shiftIndex("test_prune", "test_prune3", Collections.emptyList()); bulkClient.newIndex("test_prune4", settings); adminClient.shiftIndex("test_prune", "test_prune4", Collections.emptyList()); - IndexPruneResult indexPruneResult = - adminClient.pruneIndex("test_prune", "test_prune4", 2, 2, true); + IndexRetention indexRetention = new DefaultIndexRetention(); + indexRetention.setDelta(2); + indexRetention.setMinToKeep(2); + IndexDefinition indexDefinition = new DefaultIndexDefinition(); + indexDefinition.setIndex("test_prune"); + indexDefinition.setFullIndexName("test_prune4"); + indexDefinition.setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); + indexDefinition.setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); + indexDefinition.setRetention(indexRetention); + indexDefinition.setEnabled(true); + indexDefinition.setPrune(true); + IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); + logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune2")); assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune3")); diff --git a/gradle.properties b/gradle.properties index 25fbe5a..eaf1df9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.26 +version = 2.2.1.27 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0