From a8cd5591aff357c14be44805cc861739a2547a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Fri, 2 Aug 2024 09:31:25 +0200 Subject: [PATCH] add flush and refresh action to index shift operation --- .../java/org/xbib/elx/api/BasicClient.java | 14 +++++++++ .../java/org/xbib/elx/api/BulkClient.java | 14 --------- .../xbib/elx/common/AbstractAdminClient.java | 7 ++++- .../xbib/elx/common/AbstractBasicClient.java | 22 +++++++++++++ .../xbib/elx/common/AbstractBulkClient.java | 17 ---------- .../indices/flush/HttpFlushIndexAction.java | 31 +++++++++++++++++++ .../services/org.xbib.elx.http.HttpAction | 1 + gradle.properties | 2 +- 8 files changed, 75 insertions(+), 33 deletions(-) create mode 100644 elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/flush/HttpFlushIndexAction.java diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index 0f3f560..93dc679 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -55,5 +55,19 @@ public interface BasicClient extends Closeable { boolean isIndexClosed(IndexDefinition indexDefinition); + /** + * Flush the index. The cluster clears cache and completes indexing. + * + * @param indexDefinition index definition + */ + void flushIndex(IndexDefinition indexDefinition); + + /** + * Refresh the index. The cluster will flush the index and prepare for search. + * + * @param indexDefinition index definition + */ + void refreshIndex(IndexDefinition indexDefinition); + ScheduledExecutorService getScheduler(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 5c9ce79..c3cedda 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -130,19 +130,5 @@ public interface BulkClient extends BasicClient, Flushable { */ void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit); - /** - * Refresh the index. - * - * @param indexDefinition index definition - */ - void refreshIndex(IndexDefinition indexDefinition); - - /** - * Flush the index. The cluster clears cache and completes indexing. - * - * @param indexDefinition index definition - */ - void flushIndex(IndexDefinition indexDefinition); - BulkProcessor getBulkProcessor(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 0252982..c5158c2 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -351,8 +351,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return new EmptyIndexShiftResult(); } if (indexDefinition.isShiftEnabled()) { + logger.log(Level.INFO, "before shift, flushing index " + indexDefinition); + flushIndex(indexDefinition); + logger.log(Level.INFO, "before shift, refreshing index " + indexDefinition); + refreshIndex(indexDefinition); if (indexDefinition.isShiftNotEmpty() && isIndexEmpty(indexDefinition)) { - logger.log(Level.WARNING, "index is empty, deleting index, disabling definition, rejecting to continue shifting: " + + logger.log(Level.WARNING, "something is wrong, the index is empty. Deleting index, disabling definition, rejecting to continue shifting: " + indexDefinition); deleteIndex(indexDefinition); indexDefinition.setEnabled(false); @@ -360,6 +364,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return new EmptyIndexShiftResult(); } if (indexDefinition.isCloseShifted()) { + logger.log(Level.INFO, "before shift, closing all previous indices of " + indexDefinition); getAlias(indexDefinition.getIndex()).stream() .filter(s -> !s.equals(indexDefinition.getFullIndexName())) .forEach(this::closeIndex); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 27680cc..a8b6ec2 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -16,6 +16,10 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.flush.FlushAction; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.search.SearchAction; @@ -256,6 +260,24 @@ public abstract class AbstractBasicClient implements BasicClient { return "OPEN".equals(state); } + @Override + public void flushIndex(IndexDefinition indexDefinition) { + if (isIndexDefinitionDisabled(indexDefinition)) { + return; + } + ensureClientIsPresent(); + client.execute(FlushAction.INSTANCE, new FlushRequest(indexDefinition.getFullIndexName())).actionGet(); + } + + @Override + public void refreshIndex(IndexDefinition indexDefinition) { + if (isIndexDefinitionDisabled(indexDefinition)) { + return; + } + ensureClientIsPresent(); + client.execute(RefreshAction.INSTANCE, new RefreshRequest(indexDefinition.getFullIndexName())).actionGet(); + } + protected abstract ElasticsearchClient createClient(Settings settings); protected abstract void closeClient(Settings settings); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 5964fe0..eeb0bec 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -251,21 +251,4 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements super.updateIndexSetting(index, key, value, timeout, timeUnit); } - @Override - public void flushIndex(IndexDefinition indexDefinition) { - if (isIndexDefinitionDisabled(indexDefinition)) { - return; - } - ensureClientIsPresent(); - client.execute(FlushAction.INSTANCE, new FlushRequest(indexDefinition.getFullIndexName())).actionGet(); - } - - @Override - public void refreshIndex(IndexDefinition indexDefinition) { - if (isIndexDefinitionDisabled(indexDefinition)) { - return; - } - ensureClientIsPresent(); - client.execute(RefreshAction.INSTANCE, new RefreshRequest(indexDefinition.getFullIndexName())).actionGet(); - } } diff --git a/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/flush/HttpFlushIndexAction.java b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/flush/HttpFlushIndexAction.java new file mode 100644 index 0000000..e265e14 --- /dev/null +++ b/elx-http/src/main/java/org/xbib/elx/http/action/admin/indices/flush/HttpFlushIndexAction.java @@ -0,0 +1,31 @@ +package org.xbib.elx.http.action.admin.indices.flush; + +import org.elasticsearch.action.admin.indices.flush.FlushAction; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.xcontent.XContentParser; +import org.xbib.elx.http.HttpAction; +import org.xbib.net.http.client.HttpResponse; +import org.xbib.net.http.client.netty.HttpRequestBuilder; + +import java.io.IOException; + +public class HttpFlushIndexAction extends HttpAction { + + @Override + public FlushAction getActionInstance() { + return FlushAction.INSTANCE; + } + + @Override + protected HttpRequestBuilder createHttpRequest(String url, FlushRequest request) { + String index = request.indices() != null ? String.join(",", request.indices()) + "/" : ""; + return newPostRequest(url, "/" + index + "_flush"); + } + + @Override + protected CheckedFunction entityParser(HttpResponse httpResponse) { + return FlushResponse::fromXContent; + } +} diff --git a/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction b/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction index 33b2fbb..9ad48d1 100644 --- a/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction +++ b/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction @@ -8,6 +8,7 @@ org.xbib.elx.http.action.admin.indices.close.HttpCloseIndexAction org.xbib.elx.http.action.admin.indices.create.HttpCreateIndexAction org.xbib.elx.http.action.admin.indices.delete.HttpDeleteIndexAction org.xbib.elx.http.action.admin.indices.exists.indices.HttpIndicesExistsAction +org.xbib.elx.http.action.admin.indices.flush.HttpFlushIndexAction org.xbib.elx.http.action.admin.indices.forcemerge.HttpForceMergeAction org.xbib.elx.http.action.admin.indices.get.HttpGetIndexAction org.xbib.elx.http.action.admin.indices.mapping.get.HttpGetMappingsAction diff --git a/gradle.properties b/gradle.properties index caa995d..458fe3e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ group = org.xbib name = elx -version = 7.10.2.47 +version = 7.10.2.48