diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index 5056b5e..4a3abed 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -8,6 +8,12 @@ import java.util.Map; */ public interface AdminClient extends BasicClient { + /** + * Get the mapping of an index. + * + * @param indexDefinition the index definition + * @return the mapping + */ Map getMapping(IndexDefinition indexDefinition); void checkMapping(IndexDefinition indexDefinition); @@ -15,36 +21,45 @@ public interface AdminClient extends BasicClient { /** * Delete an index. * @param indexDefinition the index definition - * @return this */ - AdminClient deleteIndex(IndexDefinition indexDefinition); + void deleteIndex(IndexDefinition indexDefinition); - AdminClient deleteIndex(String indexName); + /** + * Delete an index. + * @param indexName the index name + */ + void deleteIndex(String indexName); /** * Close an index. * @param indexDefinition the index definition - * @return this */ - AdminClient closeIndex(IndexDefinition indexDefinition); + void closeIndex(IndexDefinition indexDefinition); - AdminClient closeIndex(String indexName); + /** + * Close an index. + * @param indexName the index name + */ + void closeIndex(String indexName); /** * Open an index. * @param indexDefinition the index definition - * @return this */ - AdminClient openIndex(IndexDefinition indexDefinition); + void openIndex(IndexDefinition indexDefinition); + + /** + * Open an index. + * @param indexName the index name + */ + void openIndex(String indexName); - AdminClient openIndex(String indexName); /** * Update replica level to the one in the index definition. * @param indexDefinition the index definition - * @return this */ - AdminClient updateReplicaLevel(IndexDefinition indexDefinition); + void updateReplicaLevel(IndexDefinition indexDefinition); /** * Get replica level. @@ -56,19 +71,32 @@ public interface AdminClient extends BasicClient { /** * Force segment merge of an index. * @param indexDefinition the index definition - * @return this + * @param maxNumSegments maximum number of segments */ - boolean forceMerge(IndexDefinition indexDefinition); + void forceMerge(IndexDefinition indexDefinition, int maxNumSegments); - Collection resolveIndex(String index); + /** + * Force segment merge of an index. + * @param indexName the index name + * @param maxNumSegments maximum number of segments + */ + void forceMerge(String indexName, int maxNumSegments); + + /** + * Resolve an alias. + * + * @param alias the alias name. + * @return the index names in ordered sequence or an empty list if there is no such index + */ + Collection getAlias(String alias); /** - * Resolve alias. + * Resolve alias to a list of indices from cluster state. * * @param alias the alias * @return the index names in ordered sequence behind the alias or an empty list if there is no such alias */ - Collection resolveAlias(String alias); + Collection resolveAliasFromClusterState(String alias); /** * Resolve alias to all connected indices, sort index names with most recent timestamp on top, return this index diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 9023d5a..ffa69db 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -2,6 +2,7 @@ package org.xbib.elx.common; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; @@ -99,18 +100,18 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @Override - public AdminClient deleteIndex(IndexDefinition indexDefinition) { + public void deleteIndex(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { - return this; + return; } - return deleteIndex(indexDefinition.getFullIndexName()); + deleteIndex(indexDefinition.getFullIndexName()); } @Override - public AdminClient deleteIndex(String indexName) { + public void deleteIndex(String indexName) { if (indexName == null) { logger.warn("no index name given to delete index"); - return this; + return; } ensureClientIsPresent(); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(indexName); @@ -119,22 +120,21 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements logger.info("index " + indexName + " deleted"); } waitForHealthyCluster(); - return this; } @Override - public AdminClient closeIndex(IndexDefinition indexDefinition) { + public void closeIndex(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { - return this; + return; } - return closeIndex(indexDefinition.getFullIndexName()); + closeIndex(indexDefinition.getFullIndexName()); } @Override - public AdminClient closeIndex(String indexName) { + public void closeIndex(String indexName) { if (indexName == null) { logger.warn("no index name given to close index"); - return this; + return; } ensureClientIsPresent(); CloseIndexRequest closeIndexRequest = new CloseIndexRequest().indices(indexName); @@ -149,22 +149,21 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } }); } - return this; } @Override - public AdminClient openIndex(IndexDefinition indexDefinition) { + public void openIndex(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { - return this; + return; } - return openIndex(indexDefinition.getFullIndexName()); + openIndex(indexDefinition.getFullIndexName()); } @Override - public AdminClient openIndex(String indexName) { + public void openIndex(String indexName) { if (indexName == null) { logger.warn("no index name given to close index"); - return this; + return; } ensureClientIsPresent(); OpenIndexRequest openIndexRequest = new OpenIndexRequest().indices(indexName); @@ -172,18 +171,17 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (openIndexResponse.isAcknowledged()) { logger.info("index " + indexName + " opened"); } - return this; } @Override - public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) { + public void updateReplicaLevel(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { - return this; + return; } if (indexDefinition.getReplicaCount() < 0) { logger.warn("invalid replica level defined for index " + indexDefinition.getIndex() + ": " + indexDefinition.getReplicaCount()); - return this; + return; } logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount()); int currentReplicaLevel = getReplicaLevel(indexDefinition); @@ -203,7 +201,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements putClusterSetting("indices.recovery.max_bytes_per_sec", "40mb", 30L, TimeUnit.SECONDS); logger.info("recovery boost deactivated"); } - return this; } @Override @@ -214,7 +211,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements ensureClientIsPresent(); String index = indexDefinition.getFullIndexName(); GetSettingsRequest request = new GetSettingsRequest().indices(index); - GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet(); + GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request) + .actionGet(); int replica = -1; for (ObjectObjectCursor cursor : response.getIndexToSettings()) { Settings settings = cursor.value; @@ -225,27 +223,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return replica; } - public Collection resolveIndex(String prefix) { - if (prefix == null) { - return null; - } - ensureClientIsPresent(); - GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(prefix); - GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); - Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); - Set indices = new TreeSet<>(Collections.reverseOrder()); - for (ObjectCursor indexName : getAliasesResponse.getAliases().keys()) { - Matcher m = pattern.matcher(indexName.value); - if (m.matches() && prefix.equals(m.group(1))) { - indices.add(indexName.value); - } - } - return indices; - } - @Override public String resolveMostRecentIndex(String alias) { - Collection indices = resolveIndex(alias); + Collection indices = getAlias(alias); return indices.isEmpty() ? alias : indices.iterator().next(); } @@ -259,8 +239,26 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return getFilters(client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet()); } + + @Override + public Collection getAlias(String alias) { + if (alias == null) { + return null; + } + ensureClientIsPresent(); + GetAliasesRequest getAliasesRequest = new GetAliasesRequest() + .aliases(alias); + GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest) + .actionGet(); + Set set = new TreeSet<>(); + for (ObjectCursor string : getAliasesResponse.getAliases().keys()) { + set.add(string.value); + } + return set; + } + @Override - public List resolveAlias(String alias) { + public List resolveAliasFromClusterState(String alias) { if (alias == null) { return Collections.emptyList(); } @@ -300,7 +298,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } if (indexDefinition.isShiftEnabled()) { if (indexDefinition.isCloseShifted()) { - resolveIndex(indexDefinition.getIndex()).stream() + getAlias(indexDefinition.getIndex()).stream() .filter(s -> !s.equals(indexDefinition.getFullIndexName())) .forEach(this::closeIndex); } @@ -324,7 +322,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return new EmptyIndexShiftResult(); // nothing to shift to } // two situations: 1. a new alias 2. there is already an old index with the alias - Optional oldIndex = resolveAlias(index).stream().sorted().findFirst(); + Optional oldIndex = resolveAliasFromClusterState(index).stream().sorted().findFirst(); Map oldAliasMap = oldIndex.map(this::getAliases).orElse(null); logger.info("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap); final List newAliases = new ArrayList<>(); @@ -492,24 +490,31 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @Override - public boolean forceMerge(IndexDefinition indexDefinition) { + public void forceMerge(IndexDefinition indexDefinition, int maxNumSegments) { if (isIndexDefinitionDisabled(indexDefinition)) { - return false; + return; } if (!indexDefinition.isForceMergeEnabled()) { - return false; + logger.info("force merge is disabled (this is the default in an index definition)"); + return; } + forceMerge(indexDefinition.getFullIndexName(), maxNumSegments); + } + + @Override + public void forceMerge(String indexName, int maxNumSegments) { ensureClientIsPresent(); - logger.info("force merge of " + indexDefinition); + logger.info("starting force merge of " + indexName + " to " + maxNumSegments + " segments"); ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); - forceMergeRequest.indices(indexDefinition.getFullIndexName()); + forceMergeRequest.indices(indexName); + forceMergeRequest.maxNumSegments(maxNumSegments); ForceMergeResponse forceMergeResponse = client.execute(ForceMergeAction.INSTANCE, forceMergeRequest) .actionGet(); + logger.log(Level.INFO, "after force merge, status = " + forceMergeResponse.getStatus()); if (forceMergeResponse.getFailedShards() > 0) { throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); } waitForHealthyCluster(); - return true; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index a01b5a2..2b56aa2 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -96,7 +96,7 @@ public class DefaultIndexDefinition implements IndexDefinition { setForceMerge(forcemerge); setShardCount(settings.getAsInt("shards", 1)); setReplicaCount(settings.getAsInt("replicas", 1)); - String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); + String fullIndexName = adminClient.resolveAliasFromClusterState(indexName).stream().findFirst().orElse(indexName); setFullIndexName(fullIndexName); setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(), Parameters.BULK_START_REFRESH_SECONDS.getInteger())); @@ -118,7 +118,7 @@ public class DefaultIndexDefinition implements IndexDefinition { Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); setDateTimePattern(dateTimePattern); String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); - fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); + fullIndexName = adminClient.resolveAliasFromClusterState(fullName).stream().findFirst().orElse(fullName); setFullIndexName(fullIndexName); boolean prune = settings.getAsBoolean("prune", false); setPrune(prune); @@ -399,4 +399,9 @@ public class DefaultIndexDefinition implements IndexDefinition { return null; } } + + @Override + public String toString() { + return index + "[" + fullIndexName + "]"; + } } diff --git a/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/alias/HttpIndicesAliasesAction.java b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/alias/HttpIndicesAliasesAction.java index 20a41a5..6cf91bf 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/alias/HttpIndicesAliasesAction.java +++ b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/alias/HttpIndicesAliasesAction.java @@ -1,6 +1,5 @@ package org.xbib.elx.http.action.admin.indices.alias; -import org.apache.logging.log4j.Level; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -28,8 +27,7 @@ public class HttpIndicesAliasesAction extends HttpAction list = new LinkedList<>(); - return new CloseIndexResponse(acknowledgedResponse.isAcknowledged(), shardAcknowledged, list); + public CloseIndexResponse fromXContent(XContentParser parser) throws IOException { + AcknowledgedResponse acknowledgedResponse = CloseIndexResponse.fromXContent(parser); + if (parser.currentToken() == null) { + parser.nextToken(); + } + boolean shardAcknowledged = true; + List list = new LinkedList<>(); + return new CloseIndexResponse(acknowledgedResponse.isAcknowledged(), shardAcknowledged, list); } } diff --git a/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/forcemerge/HttpForceMergeAction.java b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/forcemerge/HttpForceMergeAction.java new file mode 100644 index 0000000..ce5e891 --- /dev/null +++ b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/forcemerge/HttpForceMergeAction.java @@ -0,0 +1,47 @@ +package org.xbib.elx.http.action.admin.indices.forcemerge; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.xcontent.XContentParser; +import org.xbib.elx.http.HttpAction; +import org.xbib.net.http.client.HttpResponse; +import org.xbib.net.http.client.netty.HttpRequestBuilder; + +public class HttpForceMergeAction extends HttpAction { + + @Override + public ActionType getActionInstance() { + return ForceMergeAction.INSTANCE; + } + + @Override + protected HttpRequestBuilder createHttpRequest(String url, ForceMergeRequest request) throws IOException { + List params = new ArrayList<>(); + if (request.maxNumSegments() > 0) { + params.add("max_num_segments=" + request.maxNumSegments()); + } + if (request.onlyExpungeDeletes()) { + params.add("only_expunge_deletes=true"); + } + if (!request.flush()) { + params.add("flush=false"); + } + String paramStr = ""; + if (!params.isEmpty()) { + paramStr = "?" + String.join("&", params); + } + String index = request.indices() != null ? String.join(",", request.indices()) + "/" : ""; + return newPostRequest(url, "/" + index + "/_forcemerge" + paramStr); + } + + @Override + protected CheckedFunction entityParser(HttpResponse httpResponse) { + return ForceMergeResponse::fromXContent; + } +} diff --git a/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/resolve/HttpResolveIndexAction.java b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/resolve/HttpResolveIndexAction.java new file mode 100644 index 0000000..133295d --- /dev/null +++ b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/resolve/HttpResolveIndexAction.java @@ -0,0 +1,45 @@ +package org.xbib.elx.http.action.admin.indices.resolve; + +import java.io.IOException; +import java.util.List; +import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.xcontent.XContentParser; +import org.xbib.elx.http.HttpAction; +import org.xbib.net.http.client.HttpResponse; +import org.xbib.net.http.client.netty.HttpRequestBuilder; + +/** + * Incomplete for now. + */ +public class HttpResolveIndexAction extends HttpAction { + + @Override + public ResolveIndexAction getActionInstance() { + return ResolveIndexAction.INSTANCE; + } + + @Override + protected HttpRequestBuilder createHttpRequest(String url, ResolveIndexAction.Request request) { + String index = request.indices() != null ? String.join(",", request.indices()) + "/" : ""; + return newGetRequest(url, index + "/_resolve"); + } + + @Override + protected CheckedFunction entityParser(HttpResponse httpResponse) { + return this::fromXContent; + } + + public ResolveIndexAction.Response fromXContent(XContentParser parser) throws IOException { + if (parser.currentToken() == null) { + parser.nextToken(); + } + // TODO parsing + // package private constructor. We need to build a binary stream from XContent parsing + // to use the StreamInput constructor. + List resolvedIndices = null; + List resolvedAliases = null; + List resolvedDataStreams = null; + return new ResolveIndexAction.Response(resolvedIndices, resolvedAliases, resolvedDataStreams); + } +} diff --git a/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction b/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction index f78d2b6..086ac3b 100644 --- a/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction +++ b/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction @@ -9,8 +9,10 @@ org.xbib.elx.http.action.admin.indices.create.HttpCreateIndexAction org.xbib.elx.http.action.admin.indices.delete.HttpDeleteIndexAction org.xbib.elx.http.action.admin.indices.close.HttpCloseIndexAction org.xbib.elx.http.action.admin.indices.exists.indices.HttpIndicesExistsAction +org.xbib.elx.http.action.admin.indices.forcemerge.HttpForceMergeAction org.xbib.elx.http.action.admin.indices.get.HttpGetIndexAction org.xbib.elx.http.action.admin.indices.refresh.HttpRefreshIndexAction +org.xbib.elx.http.action.admin.indices.resolve.HttpResolveIndexAction org.xbib.elx.http.action.admin.indices.settings.get.HttpGetSettingsAction org.xbib.elx.http.action.admin.indices.settings.put.HttpUpdateSettingsAction org.xbib.elx.http.action.bulk.HttpBulkAction @@ -21,4 +23,4 @@ org.xbib.elx.http.action.search.HttpSearchScrollAction org.xbib.elx.http.action.main.HttpMainAction org.xbib.elx.http.action.get.HttpExistsAction org.xbib.elx.http.action.get.HttpGetAction -org.xbib.elx.http.action.get.HttpMultiGetAction \ No newline at end of file +org.xbib.elx.http.action.get.HttpMultiGetAction diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java index 8541970..48d0867 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java @@ -1,5 +1,6 @@ package org.xbib.elx.http.test; +import java.util.Collection; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -66,7 +67,7 @@ class IndexShiftTest { assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey(indexDefinition.getIndex())); - Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); + Optional resolved = adminClient.resolveAliasFromClusterState(indexDefinition.getIndex()).stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); @@ -96,7 +97,8 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test").stream().findFirst(); + Collection collection = adminClient.resolveAliasFromClusterState("test"); + resolved = collection.stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexTest.java new file mode 100644 index 0000000..6346649 --- /dev/null +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexTest.java @@ -0,0 +1,67 @@ +package org.xbib.elx.http.test; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; +import org.xbib.elx.http.HttpAdminClient; +import org.xbib.elx.http.HttpAdminClientProvider; +import org.xbib.elx.http.HttpBulkClient; +import org.xbib.elx.http.HttpBulkClientProvider; +import static org.junit.jupiter.api.Assertions.assertNull; + +@ExtendWith(TestExtension.class) +class IndexTest { + + private static final Logger logger = LogManager.getLogger(IndexTest.class.getName()); + + private final TestExtension.Helper helper; + + IndexTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testIndexForceMerge() throws Exception { + try (HttpAdminClient adminClient = ClientBuilder.builder() + .setAdminClientProvider(HttpAdminClientProvider.class) + .put(helper.getClientSettings()) + .build(); + HttpBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(HttpBulkClientProvider.class) + .put(helper.getClientSettings()) + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setFullIndexName("test_force_merge"); + indexDefinition.setForceMerge(true); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); + for (int i = 0; i < 1000; i++) { + bulkClient.index(indexDefinition, helper.randomString(1), false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); + adminClient.forceMerge(indexDefinition, 1); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); + } + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); + } + } + + @Test + void testIndexResolve() throws Exception { + try (HttpAdminClient adminClient = ClientBuilder.builder() + .setAdminClientProvider(HttpAdminClientProvider.class) + .put(helper.getClientSettings()) + .build()) { + + adminClient.getAlias("testindex"); + } + } +} diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 9f35756..96d8980 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -66,7 +66,7 @@ class IndexShiftTest { assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey(indexDefinition.getIndex())); - Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); + Optional resolved = adminClient.resolveAliasFromClusterState(indexDefinition.getIndex()).stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); @@ -96,7 +96,7 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test").stream().findFirst(); + resolved = adminClient.resolveAliasFromClusterState("test").stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexTest.java new file mode 100644 index 0000000..d3cbc83 --- /dev/null +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexTest.java @@ -0,0 +1,56 @@ +package org.xbib.elx.node.test; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; +import org.xbib.elx.node.NodeAdminClient; +import org.xbib.elx.node.NodeAdminClientProvider; +import org.xbib.elx.node.NodeBulkClient; +import org.xbib.elx.node.NodeBulkClientProvider; +import static org.junit.jupiter.api.Assertions.assertNull; + +@ExtendWith(TestExtension.class) +class IndexTest { + + private static final Logger logger = LogManager.getLogger(IndexTest.class.getName()); + + private final TestExtension.Helper helper; + + IndexTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testIndexForceMerge() throws Exception { + try (NodeAdminClient adminClient = ClientBuilder.builder() + .setAdminClientProvider(NodeAdminClientProvider.class) + .put(helper.getClientSettings()) + .build(); + NodeBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(NodeBulkClientProvider.class) + .put(helper.getClientSettings()) + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setFullIndexName("test_force_merge"); + indexDefinition.setForceMerge(true); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); + for (int i = 0; i < 1000; i++) { + bulkClient.index(indexDefinition, helper.randomString(1), false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); + adminClient.forceMerge(indexDefinition, 1); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); + } + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); + } + } +} diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 7095f5f..22d47ce 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -64,7 +64,7 @@ class IndexShiftTest { assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey(indexDefinition.getIndex())); - Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); + Optional resolved = adminClient.resolveAliasFromClusterState(indexDefinition.getIndex()).stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); @@ -93,7 +93,7 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test").stream().findFirst(); + resolved = adminClient.resolveAliasFromClusterState("test").stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexTest.java new file mode 100644 index 0000000..f0f65e0 --- /dev/null +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexTest.java @@ -0,0 +1,56 @@ +package org.xbib.elx.transport.test; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; +import org.xbib.elx.transport.TransportAdminClient; +import org.xbib.elx.transport.TransportAdminClientProvider; +import org.xbib.elx.transport.TransportBulkClient; +import org.xbib.elx.transport.TransportBulkClientProvider; +import static org.junit.jupiter.api.Assertions.assertNull; + +@ExtendWith(TestExtension.class) +class IndexTest { + + private static final Logger logger = LogManager.getLogger(IndexTest.class.getName()); + + private final TestExtension.Helper helper; + + IndexTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testIndexForceMerge() throws Exception { + try (TransportAdminClient adminClient = ClientBuilder.builder() + .setAdminClientProvider(TransportAdminClientProvider.class) + .put(helper.getClientSettings()) + .build(); + TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getClientSettings()) + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setFullIndexName("test_force_merge"); + indexDefinition.setForceMerge(true); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); + for (int i = 0; i < 1000; i++) { + bulkClient.index(indexDefinition, helper.randomString(1), false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); + adminClient.forceMerge(indexDefinition, 1); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); + } + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); + } + } +} diff --git a/gradle.properties b/gradle.properties index 57199d6..040e4c2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group = org.xbib name = elx -version = 7.10.2.27 +version = 7.10.2.28 org.gradle.warning.mode = ALL