add flush and refresh action to index shift operation

This commit is contained in:
Jörg Prante 2024-08-02 09:31:25 +02:00
parent b61ef22086
commit a8cd5591af
8 changed files with 75 additions and 33 deletions

View file

@ -55,5 +55,19 @@ public interface BasicClient extends Closeable {
boolean isIndexClosed(IndexDefinition indexDefinition); boolean isIndexClosed(IndexDefinition indexDefinition);
/**
* Flush the index. The cluster clears cache and completes indexing.
*
* @param indexDefinition index definition
*/
void flushIndex(IndexDefinition indexDefinition);
/**
* Refresh the index. The cluster will flush the index and prepare for search.
*
* @param indexDefinition index definition
*/
void refreshIndex(IndexDefinition indexDefinition);
ScheduledExecutorService getScheduler(); ScheduledExecutorService getScheduler();
} }

View file

@ -130,19 +130,5 @@ public interface BulkClient extends BasicClient, Flushable {
*/ */
void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit); void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit);
/**
* Refresh the index.
*
* @param indexDefinition index definition
*/
void refreshIndex(IndexDefinition indexDefinition);
/**
* Flush the index. The cluster clears cache and completes indexing.
*
* @param indexDefinition index definition
*/
void flushIndex(IndexDefinition indexDefinition);
BulkProcessor getBulkProcessor(); BulkProcessor getBulkProcessor();
} }

View file

@ -351,8 +351,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return new EmptyIndexShiftResult(); return new EmptyIndexShiftResult();
} }
if (indexDefinition.isShiftEnabled()) { if (indexDefinition.isShiftEnabled()) {
logger.log(Level.INFO, "before shift, flushing index " + indexDefinition);
flushIndex(indexDefinition);
logger.log(Level.INFO, "before shift, refreshing index " + indexDefinition);
refreshIndex(indexDefinition);
if (indexDefinition.isShiftNotEmpty() && isIndexEmpty(indexDefinition)) { if (indexDefinition.isShiftNotEmpty() && isIndexEmpty(indexDefinition)) {
logger.log(Level.WARNING, "index is empty, deleting index, disabling definition, rejecting to continue shifting: " + logger.log(Level.WARNING, "something is wrong, the index is empty. Deleting index, disabling definition, rejecting to continue shifting: " +
indexDefinition); indexDefinition);
deleteIndex(indexDefinition); deleteIndex(indexDefinition);
indexDefinition.setEnabled(false); indexDefinition.setEnabled(false);
@ -360,6 +364,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return new EmptyIndexShiftResult(); return new EmptyIndexShiftResult();
} }
if (indexDefinition.isCloseShifted()) { if (indexDefinition.isCloseShifted()) {
logger.log(Level.INFO, "before shift, closing all previous indices of " + indexDefinition);
getAlias(indexDefinition.getIndex()).stream() getAlias(indexDefinition.getIndex()).stream()
.filter(s -> !s.equals(indexDefinition.getFullIndexName())) .filter(s -> !s.equals(indexDefinition.getFullIndexName()))
.forEach(this::closeIndex); .forEach(this::closeIndex);

View file

@ -16,6 +16,10 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
@ -256,6 +260,24 @@ public abstract class AbstractBasicClient implements BasicClient {
return "OPEN".equals(state); return "OPEN".equals(state);
} }
@Override
public void flushIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return;
}
ensureClientIsPresent();
client.execute(FlushAction.INSTANCE, new FlushRequest(indexDefinition.getFullIndexName())).actionGet();
}
@Override
public void refreshIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return;
}
ensureClientIsPresent();
client.execute(RefreshAction.INSTANCE, new RefreshRequest(indexDefinition.getFullIndexName())).actionGet();
}
protected abstract ElasticsearchClient createClient(Settings settings); protected abstract ElasticsearchClient createClient(Settings settings);
protected abstract void closeClient(Settings settings); protected abstract void closeClient(Settings settings);

View file

@ -251,21 +251,4 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
super.updateIndexSetting(index, key, value, timeout, timeUnit); super.updateIndexSetting(index, key, value, timeout, timeUnit);
} }
@Override
public void flushIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return;
}
ensureClientIsPresent();
client.execute(FlushAction.INSTANCE, new FlushRequest(indexDefinition.getFullIndexName())).actionGet();
}
@Override
public void refreshIndex(IndexDefinition indexDefinition) {
if (isIndexDefinitionDisabled(indexDefinition)) {
return;
}
ensureClientIsPresent();
client.execute(RefreshAction.INSTANCE, new RefreshRequest(indexDefinition.getFullIndexName())).actionGet();
}
} }

View file

@ -0,0 +1,31 @@
package org.xbib.elx.http.action.admin.indices.flush;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.net.http.client.HttpResponse;
import org.xbib.net.http.client.netty.HttpRequestBuilder;
import java.io.IOException;
public class HttpFlushIndexAction extends HttpAction<FlushRequest, FlushResponse> {
@Override
public FlushAction getActionInstance() {
return FlushAction.INSTANCE;
}
@Override
protected HttpRequestBuilder createHttpRequest(String url, FlushRequest request) {
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
return newPostRequest(url, "/" + index + "_flush");
}
@Override
protected CheckedFunction<XContentParser, FlushResponse, IOException> entityParser(HttpResponse httpResponse) {
return FlushResponse::fromXContent;
}
}

View file

@ -8,6 +8,7 @@ org.xbib.elx.http.action.admin.indices.close.HttpCloseIndexAction
org.xbib.elx.http.action.admin.indices.create.HttpCreateIndexAction org.xbib.elx.http.action.admin.indices.create.HttpCreateIndexAction
org.xbib.elx.http.action.admin.indices.delete.HttpDeleteIndexAction org.xbib.elx.http.action.admin.indices.delete.HttpDeleteIndexAction
org.xbib.elx.http.action.admin.indices.exists.indices.HttpIndicesExistsAction org.xbib.elx.http.action.admin.indices.exists.indices.HttpIndicesExistsAction
org.xbib.elx.http.action.admin.indices.flush.HttpFlushIndexAction
org.xbib.elx.http.action.admin.indices.forcemerge.HttpForceMergeAction org.xbib.elx.http.action.admin.indices.forcemerge.HttpForceMergeAction
org.xbib.elx.http.action.admin.indices.get.HttpGetIndexAction org.xbib.elx.http.action.admin.indices.get.HttpGetIndexAction
org.xbib.elx.http.action.admin.indices.mapping.get.HttpGetMappingsAction org.xbib.elx.http.action.admin.indices.mapping.get.HttpGetMappingsAction

View file

@ -1,3 +1,3 @@
group = org.xbib group = org.xbib
name = elx name = elx
version = 7.10.2.47 version = 7.10.2.48