add open/close to HTTP API

This commit is contained in:
Jörg Prante 2023-02-10 14:26:54 +01:00
parent 3f8c1ca1d3
commit b269eb7cce
10 changed files with 105 additions and 37 deletions

View file

@ -60,18 +60,19 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
createHttpRequest(httpActionContext.getUrl(), httpActionContext.getRequest());
HttpRequest httpRequest = httpRequestBuilder.build();
httpRequest.setResponseListener(fullHttpResponse -> {
httpActionContext.setHttpResponse(fullHttpResponse);
String content = httpActionContext.getHttpResponse().getBodyAsChars(StandardCharsets.UTF_8).toString();
if (logger.isTraceEnabled()) {
logger.log(Level.TRACE, "got response: " + fullHttpResponse.getStatus().codeAsText() +
" headers = " + fullHttpResponse.getHeaders() +
" content = " + fullHttpResponse.getBodyAsChars(StandardCharsets.UTF_8));
" content = " + content);
}
httpActionContext.setHttpResponse(fullHttpResponse);
if (fullHttpResponse.getStatus().code() == 200) {
listener.onResponse(parseToResponse(httpActionContext));
listener.onResponse(parseToResponse(httpActionContext, content));
} else {
ElasticsearchStatusException statusException = parseToError(httpActionContext);
ElasticsearchStatusException statusException = parseToError(httpActionContext, content);
if (statusException.status().equals(RestStatus.NOT_FOUND)) {
listener.onResponse(parseToResponse(httpActionContext));
listener.onResponse(parseToResponse(httpActionContext, content));
} else {
listener.onFailure(statusException);
}
@ -147,7 +148,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
return HttpRequest.builder(method).setURL(URL.from(baseUrl).resolve(path)).content(content, APPLICATION_JSON);
}
protected T parseToResponse(HttpActionContext<R, T> httpActionContext) {
protected T parseToResponse(HttpActionContext<R, T> httpActionContext, String content) {
String mediaType = httpActionContext.getHttpResponse().getHeaders().get(HttpHeaderNames.CONTENT_TYPE);
XContentType xContentType = XContentType.fromMediaTypeOrFormat(mediaType);
if (xContentType == null) {
@ -156,23 +157,24 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
try (XContentParser parser = xContentType.xContent()
.createParser(httpActionContext.getExtendedHttpClient().getRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
httpActionContext.getHttpResponse().getBodyAsChars(StandardCharsets.UTF_8).toString())) {
content)) {
return entityParser(httpActionContext.getHttpResponse()).apply(parser);
} catch (Throwable e) {
// catch all kinds of errors in the entity parsing process
logger.error(e.getMessage(), e);
logger.error("status = " + httpActionContext.getHttpResponse().getStatus().code());
logger.error("body = " + httpActionContext.getHttpResponse().getBodyAsChars(StandardCharsets.UTF_8));
logger.error("body = " + content);
return null;
}
}
protected ElasticsearchStatusException parseToError(HttpActionContext<R, T> httpActionContext) {
protected ElasticsearchStatusException parseToError(HttpActionContext<R, T> httpActionContext,
String content) {
// we assume a non-empty, valid JSON response body. If there is none, this method must be overriden.
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(httpActionContext.getExtendedHttpClient().getRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
httpActionContext.getHttpResponse().getBodyAsChars(StandardCharsets.UTF_8).toString())) {
content)) {
return errorParser().apply(parser);
} catch (Exception e) {
logger.error(e.getMessage(), e);

View file

@ -2,7 +2,6 @@ package org.xbib.elx.http.action.admin.cluster.state;
import com.carrotsearch.hppc.LongArrayList;
import org.apache.logging.log4j.Level;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -26,6 +25,7 @@ import org.xbib.net.http.client.HttpResponse;
import org.xbib.net.http.client.netty.HttpRequestBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -98,7 +98,7 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
if (METADATA.equals(currentFieldName)) {
parser.nextToken();
// MetaData.fromXContent(parser) is broken (Expected [meta-data] as a field name but got cluster_uuid)
// so we have to replace it
// so we have to replace it by our code.
builder.metadata(metadataFromXContent(parser));
}
}
@ -123,8 +123,8 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
builder.persistentSettings(Settings.fromXContent(parser));
} else if ("indices".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
// see IndexMetadata.Builder.fromXContent(parser)
builder.put(indexMetaDataFromXContent(parser), false);
// builder.put(IndexMetadata.Builder.fromXContent(parser), false);
}
} else if ("hashes_of_consistent_settings".equals(currentFieldName)) {
builder.hashesOfConsistentSettings(parser.mapStrings());
@ -145,9 +145,9 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
}
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
// private field
// skip
} else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) {
// private field
// skip
} else if ("cluster_uuid_committed".equals(currentFieldName)) {
// skip
} else {
@ -190,9 +190,6 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected object but got a " + token);
}
boolean mappingVersion = false;
boolean settingsVersion = false;
boolean aliasesVersion = false;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -244,11 +241,6 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
}
}
} else if ("warmers".equals(currentFieldName)) {
// TODO: do this in 6.0:
// throw new IllegalArgumentException("Warmers are not supported anymore - are you upgrading from 1.x?");
// ignore: warmers have been removed in 5.0 and are
// simply ignored when upgrading from 2.x
assert Version.CURRENT.major <= 5;
parser.skipChildren();
} else {
throw new IllegalArgumentException("Unexpected field for an object " + currentFieldName);
@ -288,13 +280,13 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
} else if (KEY_VERSION.equals(currentFieldName)) {
builder.version(parser.longValue());
} else if (KEY_MAPPING_VERSION.equals(currentFieldName)) {
mappingVersion = true;
// mappingVersion = true;
builder.mappingVersion(parser.longValue());
} else if (KEY_SETTINGS_VERSION.equals(currentFieldName)) {
settingsVersion = true;
// settingsVersion = true;
builder.settingsVersion(parser.longValue());
} else if (KEY_ALIASES_VERSION.equals(currentFieldName)) {
aliasesVersion = true;
// aliasesVersion = true;
builder.aliasesVersion(parser.longValue());
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
builder.setRoutingNumShards(parser.intValue());

View file

@ -1,5 +1,6 @@
package org.xbib.elx.http.action.admin.indices.alias.get;
import org.apache.logging.log4j.Level;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
@ -13,6 +14,7 @@ import org.xbib.net.http.client.HttpResponse;
import org.xbib.net.http.client.netty.HttpRequestBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -22,11 +24,9 @@ public class HttpGetAliasAction extends HttpAction<GetAliasesRequest, GetAliases
@Override
protected HttpRequestBuilder createHttpRequest(String url, GetAliasesRequest request) {
// beware of this inconsistency, request.indices() always return empty array
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
String aliases = request.aliases() != null ? String.join(",", request.aliases()) + "/" : "";
// do not add "/" in front of index
return newGetRequest(url, index + "_alias/" + aliases);
String index = request.indices() != null ? String.join(",", request.indices()) : "";
String aliases = request.aliases() != null ? String.join(",", request.aliases()) : "";
return newGetRequest(url, index + "/_alias" + (aliases.isEmpty() ? "" : "/" + aliases));
}
@Override

View file

@ -0,0 +1,45 @@
package org.xbib.elx.http.action.admin.indices.close;
import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.net.http.client.HttpResponse;
import org.xbib.net.http.client.netty.HttpRequestBuilder;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class HttpCloseIndexAction extends HttpAction<CloseIndexRequest, CloseIndexResponse> {
@Override
public CloseIndexAction getActionInstance() {
return CloseIndexAction.INSTANCE;
}
@Override
protected HttpRequestBuilder createHttpRequest(String url, CloseIndexRequest closeIndexRequest) {
return newPostRequest(url, "/" + String.join(",", closeIndexRequest.indices()) + "/_close");
}
@Override
protected CheckedFunction<XContentParser, CloseIndexResponse, IOException> entityParser(HttpResponse httpResponse) {
return this::fromXContent;
}
public CloseIndexResponse fromXContent(XContentParser parser) throws IOException {
AcknowledgedResponse acknowledgedResponse = CloseIndexResponse.fromXContent(parser);
if (parser.currentToken() == null) {
parser.nextToken();
}
boolean shardAcknowledged = true;
List<CloseIndexResponse.IndexResult> list = new LinkedList<>();
return new CloseIndexResponse(acknowledgedResponse.isAcknowledged(), shardAcknowledged, list);
}
}

View file

@ -33,7 +33,7 @@ public class HttpIndicesExistsAction extends HttpAction<IndicesExistsRequest, In
* @return the ELasticsearch sttatus exception
*/
@Override
protected ElasticsearchStatusException parseToError(HttpActionContext<IndicesExistsRequest, IndicesExistsResponse> httpActionContext) {
protected ElasticsearchStatusException parseToError(HttpActionContext<IndicesExistsRequest, IndicesExistsResponse> httpActionContext, String content) {
return new ElasticsearchStatusException("not found", RestStatus.NOT_FOUND);
}

View file

@ -13,7 +13,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class HtppGetIndexAction extends HttpAction<GetIndexRequest, GetIndexResponse> {
public class HttpGetIndexAction extends HttpAction<GetIndexRequest, GetIndexResponse> {
@Override
public GetIndexAction getActionInstance() {
@ -25,7 +25,6 @@ public class HtppGetIndexAction extends HttpAction<GetIndexRequest, GetIndexResp
List<String> list = getIndexRequest.indices().length == 0 ?
List.of("*") : Arrays.asList(getIndexRequest.indices());
String command = "/" + String.join(",", list);
logger.info("command = " + command);
return newGetRequest(url, command);
}

View file

@ -0,0 +1,30 @@
package org.xbib.elx.http.action.admin.indices.open;
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.net.http.client.HttpResponse;
import org.xbib.net.http.client.netty.HttpRequestBuilder;
import java.io.IOException;
public class HttpOpenIndexAction extends HttpAction<OpenIndexRequest, OpenIndexResponse> {
@Override
public OpenIndexAction getActionInstance() {
return OpenIndexAction.INSTANCE;
}
@Override
protected HttpRequestBuilder createHttpRequest(String url, OpenIndexRequest openIndexRequest) {
return newPostRequest(url, "/" + String.join(",", openIndexRequest.indices()) + "/_open");
}
@Override
protected CheckedFunction<XContentParser, OpenIndexResponse, IOException> entityParser(HttpResponse httpResponse) {
return OpenIndexResponse::fromXContent;
}
}

View file

@ -7,8 +7,9 @@ org.xbib.elx.http.action.admin.indices.alias.HttpIndicesAliasesAction
org.xbib.elx.http.action.admin.indices.alias.get.HttpGetAliasAction
org.xbib.elx.http.action.admin.indices.create.HttpCreateIndexAction
org.xbib.elx.http.action.admin.indices.delete.HttpDeleteIndexAction
org.xbib.elx.http.action.admin.indices.close.HttpCloseIndexAction
org.xbib.elx.http.action.admin.indices.exists.indices.HttpIndicesExistsAction
org.xbib.elx.http.action.admin.indices.get.HtppGetIndexAction
org.xbib.elx.http.action.admin.indices.get.HttpGetIndexAction
org.xbib.elx.http.action.admin.indices.refresh.HttpRefreshIndexAction
org.xbib.elx.http.action.admin.indices.settings.get.HttpGetSettingsAction
org.xbib.elx.http.action.admin.indices.settings.put.HttpUpdateSettingsAction

View file

@ -93,7 +93,6 @@ class IndexPruneTest {
}
@Test
@Disabled("internal error")
void testPruneWithClose() throws IOException {
try (HttpAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(HttpAdminClientProvider.class)

View file

@ -1,5 +1,5 @@
group = org.xbib
name = elx
version = 7.10.2.25
version = 7.10.2.26
org.gradle.warning.mode = ALL