add force merge, cleanup resolve alias

This commit is contained in:
Jörg Prante 2023-02-11 22:06:45 +01:00
parent 819f31dffc
commit f4c25b6430
15 changed files with 400 additions and 91 deletions

View file

@ -8,6 +8,12 @@ import java.util.Map;
*/
public interface AdminClient extends BasicClient {
/**
* Get the mapping of an index.
*
* @param indexDefinition the index definition
* @return the mapping
*/
Map<String, Object> getMapping(IndexDefinition indexDefinition);
void checkMapping(IndexDefinition indexDefinition);
@ -15,36 +21,45 @@ public interface AdminClient extends BasicClient {
/**
* Delete an index.
* @param indexDefinition the index definition
* @return this
*/
AdminClient deleteIndex(IndexDefinition indexDefinition);
void deleteIndex(IndexDefinition indexDefinition);
AdminClient deleteIndex(String indexName);
/**
* Delete an index.
* @param indexName the index name
*/
void deleteIndex(String indexName);
/**
* Close an index.
* @param indexDefinition the index definition
* @return this
*/
AdminClient closeIndex(IndexDefinition indexDefinition);
void closeIndex(IndexDefinition indexDefinition);
AdminClient closeIndex(String indexName);
/**
* Close an index.
* @param indexName the index name
*/
void closeIndex(String indexName);
/**
* Open an index.
* @param indexDefinition the index definition
* @return this
*/
AdminClient openIndex(IndexDefinition indexDefinition);
void openIndex(IndexDefinition indexDefinition);
/**
* Open an index.
* @param indexName the index name
*/
void openIndex(String indexName);
AdminClient openIndex(String indexName);
/**
* Update replica level to the one in the index definition.
* @param indexDefinition the index definition
* @return this
*/
AdminClient updateReplicaLevel(IndexDefinition indexDefinition);
void updateReplicaLevel(IndexDefinition indexDefinition);
/**
* Get replica level.
@ -56,19 +71,32 @@ public interface AdminClient extends BasicClient {
/**
* Force segment merge of an index.
* @param indexDefinition the index definition
* @return this
* @param maxNumSegments maximum number of segments
*/
boolean forceMerge(IndexDefinition indexDefinition);
Collection<String> resolveIndex(String index);
void forceMerge(IndexDefinition indexDefinition, int maxNumSegments);
/**
* Resolve alias.
* Force segment merge of an index.
* @param indexName the index name
* @param maxNumSegments maximum number of segments
*/
void forceMerge(String indexName, int maxNumSegments);
/**
* Resolve an alias.
*
* @param alias the alias name.
* @return the index names in ordered sequence or an empty list if there is no such index
*/
Collection<String> getAlias(String alias);
/**
* Resolve alias to a list of indices from cluster state.
*
* @param alias the alias
* @return the index names in ordered sequence behind the alias or an empty list if there is no such alias
*/
Collection<String> resolveAlias(String alias);
Collection<String> resolveAliasFromClusterState(String alias);
/**
* Resolve alias to all connected indices, sort index names with most recent timestamp on top, return this index

View file

@ -2,6 +2,7 @@ package org.xbib.elx.common;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
@ -99,18 +100,18 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
@Override
public AdminClient deleteIndex(IndexDefinition indexDefinition) {
public void deleteIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return this;
return;
}
return deleteIndex(indexDefinition.getFullIndexName());
deleteIndex(indexDefinition.getFullIndexName());
}
@Override
public AdminClient deleteIndex(String indexName) {
public void deleteIndex(String indexName) {
if (indexName == null) {
logger.warn("no index name given to delete index");
return this;
return;
}
ensureClientIsPresent();
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(indexName);
@ -119,22 +120,21 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
logger.info("index " + indexName + " deleted");
}
waitForHealthyCluster();
return this;
}
@Override
public AdminClient closeIndex(IndexDefinition indexDefinition) {
public void closeIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return this;
return;
}
return closeIndex(indexDefinition.getFullIndexName());
closeIndex(indexDefinition.getFullIndexName());
}
@Override
public AdminClient closeIndex(String indexName) {
public void closeIndex(String indexName) {
if (indexName == null) {
logger.warn("no index name given to close index");
return this;
return;
}
ensureClientIsPresent();
CloseIndexRequest closeIndexRequest = new CloseIndexRequest().indices(indexName);
@ -149,22 +149,21 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
});
}
return this;
}
@Override
public AdminClient openIndex(IndexDefinition indexDefinition) {
public void openIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return this;
return;
}
return openIndex(indexDefinition.getFullIndexName());
openIndex(indexDefinition.getFullIndexName());
}
@Override
public AdminClient openIndex(String indexName) {
public void openIndex(String indexName) {
if (indexName == null) {
logger.warn("no index name given to close index");
return this;
return;
}
ensureClientIsPresent();
OpenIndexRequest openIndexRequest = new OpenIndexRequest().indices(indexName);
@ -172,18 +171,17 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
if (openIndexResponse.isAcknowledged()) {
logger.info("index " + indexName + " opened");
}
return this;
}
@Override
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition) {
public void updateReplicaLevel(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return this;
return;
}
if (indexDefinition.getReplicaCount() < 0) {
logger.warn("invalid replica level defined for index "
+ indexDefinition.getIndex() + ": " + indexDefinition.getReplicaCount());
return this;
return;
}
logger.info("update replica level for " + indexDefinition + " to " + indexDefinition.getReplicaCount());
int currentReplicaLevel = getReplicaLevel(indexDefinition);
@ -203,7 +201,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
putClusterSetting("indices.recovery.max_bytes_per_sec", "40mb", 30L, TimeUnit.SECONDS);
logger.info("recovery boost deactivated");
}
return this;
}
@Override
@ -214,7 +211,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
ensureClientIsPresent();
String index = indexDefinition.getFullIndexName();
GetSettingsRequest request = new GetSettingsRequest().indices(index);
GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet();
GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request)
.actionGet();
int replica = -1;
for (ObjectObjectCursor<String, Settings> cursor : response.getIndexToSettings()) {
Settings settings = cursor.value;
@ -225,27 +223,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return replica;
}
public Collection<String> resolveIndex(String prefix) {
if (prefix == null) {
return null;
}
ensureClientIsPresent();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(prefix);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
Set<String> indices = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
Matcher m = pattern.matcher(indexName.value);
if (m.matches() && prefix.equals(m.group(1))) {
indices.add(indexName.value);
}
}
return indices;
}
@Override
public String resolveMostRecentIndex(String alias) {
Collection<String> indices = resolveIndex(alias);
Collection<String> indices = getAlias(alias);
return indices.isEmpty() ? alias : indices.iterator().next();
}
@ -259,8 +239,26 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return getFilters(client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet());
}
@Override
public List<String> resolveAlias(String alias) {
public Collection<String> getAlias(String alias) {
if (alias == null) {
return null;
}
ensureClientIsPresent();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest()
.aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest)
.actionGet();
Set<String> set = new TreeSet<>();
for (ObjectCursor<String> string : getAliasesResponse.getAliases().keys()) {
set.add(string.value);
}
return set;
}
@Override
public List<String> resolveAliasFromClusterState(String alias) {
if (alias == null) {
return Collections.emptyList();
}
@ -300,7 +298,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
if (indexDefinition.isShiftEnabled()) {
if (indexDefinition.isCloseShifted()) {
resolveIndex(indexDefinition.getIndex()).stream()
getAlias(indexDefinition.getIndex()).stream()
.filter(s -> !s.equals(indexDefinition.getFullIndexName()))
.forEach(this::closeIndex);
}
@ -324,7 +322,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return new EmptyIndexShiftResult(); // nothing to shift to
}
// two situations: 1. a new alias 2. there is already an old index with the alias
Optional<String> oldIndex = resolveAlias(index).stream().sorted().findFirst();
Optional<String> oldIndex = resolveAliasFromClusterState(index).stream().sorted().findFirst();
Map<String, String> oldAliasMap = oldIndex.map(this::getAliases).orElse(null);
logger.info("old index = {} old alias map = {}", oldIndex.orElse(""), oldAliasMap);
final List<String> newAliases = new ArrayList<>();
@ -492,24 +490,31 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
@Override
public boolean forceMerge(IndexDefinition indexDefinition) {
public void forceMerge(IndexDefinition indexDefinition, int maxNumSegments) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return false;
return;
}
if (!indexDefinition.isForceMergeEnabled()) {
return false;
logger.info("force merge is disabled (this is the default in an index definition)");
return;
}
forceMerge(indexDefinition.getFullIndexName(), maxNumSegments);
}
@Override
public void forceMerge(String indexName, int maxNumSegments) {
ensureClientIsPresent();
logger.info("force merge of " + indexDefinition);
logger.info("starting force merge of " + indexName + " to " + maxNumSegments + " segments");
ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
forceMergeRequest.indices(indexDefinition.getFullIndexName());
forceMergeRequest.indices(indexName);
forceMergeRequest.maxNumSegments(maxNumSegments);
ForceMergeResponse forceMergeResponse = client.execute(ForceMergeAction.INSTANCE, forceMergeRequest)
.actionGet();
logger.log(Level.INFO, "after force merge, status = " + forceMergeResponse.getStatus());
if (forceMergeResponse.getFailedShards() > 0) {
throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards());
}
waitForHealthyCluster();
return true;
}
@Override

View file

@ -96,7 +96,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
setForceMerge(forcemerge);
setShardCount(settings.getAsInt("shards", 1));
setReplicaCount(settings.getAsInt("replicas", 1));
String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName);
String fullIndexName = adminClient.resolveAliasFromClusterState(indexName).stream().findFirst().orElse(indexName);
setFullIndexName(fullIndexName);
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(),
Parameters.BULK_START_REFRESH_SECONDS.getInteger()));
@ -118,7 +118,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
Pattern dateTimePattern = Pattern.compile(dateTimePatternStr);
setDateTimePattern(dateTimePattern);
String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now());
fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName);
fullIndexName = adminClient.resolveAliasFromClusterState(fullName).stream().findFirst().orElse(fullName);
setFullIndexName(fullIndexName);
boolean prune = settings.getAsBoolean("prune", false);
setPrune(prune);
@ -399,4 +399,9 @@ public class DefaultIndexDefinition implements IndexDefinition {
return null;
}
}
@Override
public String toString() {
return index + "[" + fullIndexName + "]";
}
}

View file

@ -1,6 +1,5 @@
package org.xbib.elx.http.action.admin.indices.alias;
import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -28,8 +27,7 @@ public class HttpIndicesAliasesAction extends HttpAction<IndicesAliasesRequest,
try {
XContentBuilder builder = JsonXContent.contentBuilder();
request.toXContent(builder, ToXContent.EMPTY_PARAMS);
String body = Strings.toString(builder);
logger.log(Level.DEBUG, "body = " + body);
String body = Strings.toString(builder);
return newPostRequest(url, "/_aliases", body);
} catch (IOException e) {
logger.error(e.getMessage(), e);

View file

@ -3,8 +3,6 @@ package org.xbib.elx.http.action.admin.indices.close;
import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
@ -33,13 +31,13 @@ public class HttpCloseIndexAction extends HttpAction<CloseIndexRequest, CloseInd
return this::fromXContent;
}
public CloseIndexResponse fromXContent(XContentParser parser) throws IOException {
AcknowledgedResponse acknowledgedResponse = CloseIndexResponse.fromXContent(parser);
if (parser.currentToken() == null) {
parser.nextToken();
}
boolean shardAcknowledged = true;
List<CloseIndexResponse.IndexResult> list = new LinkedList<>();
return new CloseIndexResponse(acknowledgedResponse.isAcknowledged(), shardAcknowledged, list);
public CloseIndexResponse fromXContent(XContentParser parser) throws IOException {
AcknowledgedResponse acknowledgedResponse = CloseIndexResponse.fromXContent(parser);
if (parser.currentToken() == null) {
parser.nextToken();
}
boolean shardAcknowledged = true;
List<CloseIndexResponse.IndexResult> list = new LinkedList<>();
return new CloseIndexResponse(acknowledgedResponse.isAcknowledged(), shardAcknowledged, list);
}
}

View file

@ -0,0 +1,47 @@
package org.xbib.elx.http.action.admin.indices.forcemerge;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.net.http.client.HttpResponse;
import org.xbib.net.http.client.netty.HttpRequestBuilder;
public class HttpForceMergeAction extends HttpAction<ForceMergeRequest, ForceMergeResponse> {
@Override
public ActionType<ForceMergeResponse> getActionInstance() {
return ForceMergeAction.INSTANCE;
}
@Override
protected HttpRequestBuilder createHttpRequest(String url, ForceMergeRequest request) throws IOException {
List<String> params = new ArrayList<>();
if (request.maxNumSegments() > 0) {
params.add("max_num_segments=" + request.maxNumSegments());
}
if (request.onlyExpungeDeletes()) {
params.add("only_expunge_deletes=true");
}
if (!request.flush()) {
params.add("flush=false");
}
String paramStr = "";
if (!params.isEmpty()) {
paramStr = "?" + String.join("&", params);
}
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
return newPostRequest(url, "/" + index + "/_forcemerge" + paramStr);
}
@Override
protected CheckedFunction<XContentParser, ForceMergeResponse, IOException> entityParser(HttpResponse httpResponse) {
return ForceMergeResponse::fromXContent;
}
}

View file

@ -0,0 +1,45 @@
package org.xbib.elx.http.action.admin.indices.resolve;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.net.http.client.HttpResponse;
import org.xbib.net.http.client.netty.HttpRequestBuilder;
/**
* Incomplete for now.
*/
public class HttpResolveIndexAction extends HttpAction<ResolveIndexAction.Request, ResolveIndexAction.Response> {
@Override
public ResolveIndexAction getActionInstance() {
return ResolveIndexAction.INSTANCE;
}
@Override
protected HttpRequestBuilder createHttpRequest(String url, ResolveIndexAction.Request request) {
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
return newGetRequest(url, index + "/_resolve");
}
@Override
protected CheckedFunction<XContentParser, ResolveIndexAction.Response, IOException> entityParser(HttpResponse httpResponse) {
return this::fromXContent;
}
public ResolveIndexAction.Response fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) {
parser.nextToken();
}
// TODO parsing
// package private constructor. We need to build a binary stream from XContent parsing
// to use the StreamInput constructor.
List<ResolveIndexAction.ResolvedIndex> resolvedIndices = null;
List<ResolveIndexAction.ResolvedAlias> resolvedAliases = null;
List<ResolveIndexAction.ResolvedDataStream> resolvedDataStreams = null;
return new ResolveIndexAction.Response(resolvedIndices, resolvedAliases, resolvedDataStreams);
}
}

View file

@ -9,8 +9,10 @@ org.xbib.elx.http.action.admin.indices.create.HttpCreateIndexAction
org.xbib.elx.http.action.admin.indices.delete.HttpDeleteIndexAction
org.xbib.elx.http.action.admin.indices.close.HttpCloseIndexAction
org.xbib.elx.http.action.admin.indices.exists.indices.HttpIndicesExistsAction
org.xbib.elx.http.action.admin.indices.forcemerge.HttpForceMergeAction
org.xbib.elx.http.action.admin.indices.get.HttpGetIndexAction
org.xbib.elx.http.action.admin.indices.refresh.HttpRefreshIndexAction
org.xbib.elx.http.action.admin.indices.resolve.HttpResolveIndexAction
org.xbib.elx.http.action.admin.indices.settings.get.HttpGetSettingsAction
org.xbib.elx.http.action.admin.indices.settings.put.HttpUpdateSettingsAction
org.xbib.elx.http.action.bulk.HttpBulkAction

View file

@ -1,5 +1,6 @@
package org.xbib.elx.http.test;
import java.util.Collection;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -66,7 +67,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey(indexDefinition.getIndex()));
Optional<String> resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst();
Optional<String> resolved = adminClient.resolveAliasFromClusterState(indexDefinition.getIndex()).stream().findFirst();
aliases = resolved.isPresent() ?
adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a"));
@ -96,7 +97,8 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = adminClient.resolveAlias("test").stream().findFirst();
Collection<String> collection = adminClient.resolveAliasFromClusterState("test");
resolved = collection.stream().findFirst();
aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));

View file

@ -0,0 +1,67 @@
package org.xbib.elx.http.test;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.http.HttpAdminClient;
import org.xbib.elx.http.HttpAdminClientProvider;
import org.xbib.elx.http.HttpBulkClient;
import org.xbib.elx.http.HttpBulkClientProvider;
import static org.junit.jupiter.api.Assertions.assertNull;
@ExtendWith(TestExtension.class)
class IndexTest {
private static final Logger logger = LogManager.getLogger(IndexTest.class.getName());
private final TestExtension.Helper helper;
IndexTest(TestExtension.Helper helper) {
this.helper = helper;
}
@Test
void testIndexForceMerge() throws Exception {
try (HttpAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(HttpAdminClientProvider.class)
.put(helper.getClientSettings())
.build();
HttpBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(HttpBulkClientProvider.class)
.put(helper.getClientSettings())
.build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_force_merge");
indexDefinition.setForceMerge(true);
bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition);
for (int i = 0; i < 1000; i++) {
bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.stopBulk(indexDefinition);
adminClient.forceMerge(indexDefinition, 1);
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
}
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
}
}
@Test
void testIndexResolve() throws Exception {
try (HttpAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(HttpAdminClientProvider.class)
.put(helper.getClientSettings())
.build()) {
adminClient.getAlias("testindex");
}
}
}

View file

@ -66,7 +66,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey(indexDefinition.getIndex()));
Optional<String> resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst();
Optional<String> resolved = adminClient.resolveAliasFromClusterState(indexDefinition.getIndex()).stream().findFirst();
aliases = resolved.isPresent() ?
adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a"));
@ -96,7 +96,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = adminClient.resolveAlias("test").stream().findFirst();
resolved = adminClient.resolveAliasFromClusterState("test").stream().findFirst();
aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));

View file

@ -0,0 +1,56 @@
package org.xbib.elx.node.test;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.node.NodeAdminClient;
import org.xbib.elx.node.NodeAdminClientProvider;
import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider;
import static org.junit.jupiter.api.Assertions.assertNull;
@ExtendWith(TestExtension.class)
class IndexTest {
private static final Logger logger = LogManager.getLogger(IndexTest.class.getName());
private final TestExtension.Helper helper;
IndexTest(TestExtension.Helper helper) {
this.helper = helper;
}
@Test
void testIndexForceMerge() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getClientSettings())
.build();
NodeBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getClientSettings())
.build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_force_merge");
indexDefinition.setForceMerge(true);
bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition);
for (int i = 0; i < 1000; i++) {
bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.stopBulk(indexDefinition);
adminClient.forceMerge(indexDefinition, 1);
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
}
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
}
}
}

View file

@ -64,7 +64,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey(indexDefinition.getIndex()));
Optional<String> resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst();
Optional<String> resolved = adminClient.resolveAliasFromClusterState(indexDefinition.getIndex()).stream().findFirst();
aliases = resolved.isPresent() ?
adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a"));
@ -93,7 +93,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = adminClient.resolveAlias("test").stream().findFirst();
resolved = adminClient.resolveAliasFromClusterState("test").stream().findFirst();
aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap();
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));

View file

@ -0,0 +1,56 @@
package org.xbib.elx.transport.test;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.DefaultIndexDefinition;
import org.xbib.elx.transport.TransportAdminClient;
import org.xbib.elx.transport.TransportAdminClientProvider;
import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider;
import static org.junit.jupiter.api.Assertions.assertNull;
@ExtendWith(TestExtension.class)
class IndexTest {
private static final Logger logger = LogManager.getLogger(IndexTest.class.getName());
private final TestExtension.Helper helper;
IndexTest(TestExtension.Helper helper) {
this.helper = helper;
}
@Test
void testIndexForceMerge() throws Exception {
try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getClientSettings())
.build();
TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getClientSettings())
.build()) {
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
indexDefinition.setFullIndexName("test_force_merge");
indexDefinition.setForceMerge(true);
bulkClient.newIndex(indexDefinition);
bulkClient.startBulk(indexDefinition);
for (int i = 0; i < 1000; i++) {
bulkClient.index(indexDefinition, helper.randomString(1), false,
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
}
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.stopBulk(indexDefinition);
adminClient.forceMerge(indexDefinition, 1);
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
}
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
}
}
}

View file

@ -1,5 +1,5 @@
group = org.xbib
name = elx
version = 7.10.2.27
version = 7.10.2.28
org.gradle.warning.mode = ALL