make HTTP client work

This commit is contained in:
Jörg Prante 2019-05-06 11:28:58 +02:00
parent d4e40145be
commit 49b35226c2
36 changed files with 1000 additions and 166 deletions

View file

@ -95,6 +95,7 @@ subprojects {
clean {
delete "out"
delete "null"
}
/*javadoc {

View file

@ -33,9 +33,6 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@ -238,7 +235,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public String getClusterName() {
ensureActive();
try {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value();
@ -588,7 +586,11 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public String resolveAlias(String alias) {
ensureActive();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.blocks(false);
clusterStateRequest.metaData(true);
clusterStateRequest.nodes(false);
clusterStateRequest.routingTable(false);
clusterStateRequest.customs(false);
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
SortedMap<String, AliasOrIndex> map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup();

View file

@ -0,0 +1,61 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class HttpGetMappingsAction extends HttpAction<GetMappingsRequest, GetMappingsResponse> {
private static final ParseField MAPPINGS = new ParseField("mappings");
@Override
public GetMappingsAction getActionInstance() {
return GetMappingsAction.INSTANCE;
}
@Override
protected RequestBuilder createHttpRequest(String url, GetMappingsRequest request) {
String index = request.indices() != null ? "/" + String.join(",", request.indices()) : "";
return newGetRequest(url, index + "/_mapping");
}
@Override
protected CheckedFunction<XContentParser, GetMappingsResponse, IOException> entityParser() {
return this::fromXContent;
}
@Override
protected GetMappingsResponse emptyResponse() {
return new GetMappingsResponse();
}
@SuppressWarnings("unchecked")
private GetMappingsResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
Map<String, Object> parts = parser.map();
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> builder = new ImmutableOpenMap.Builder<>();
for (Map.Entry<String, Object> entry : parts.entrySet()) {
String indexName = entry.getKey();
Map<String, Object> mapping = (Map<String, Object>) ((Map) entry.getValue()).get(MAPPINGS.getPreferredName());
ImmutableOpenMap.Builder<String, MappingMetaData> typeBuilder = new ImmutableOpenMap.Builder<>();
for (Map.Entry<String, Object> typeEntry : mapping.entrySet()) {
String typeName = typeEntry.getKey();
Map<String, Object> fieldMappings = (Map<String, Object>) typeEntry.getValue();
MappingMetaData mmd = new MappingMetaData(typeName, fieldMappings);
typeBuilder.put(typeName, mmd);
}
builder.put(indexName, typeBuilder.build());
}
return new GetMappingsResponse(builder.build());
}
}

View file

@ -1,5 +1,6 @@
package org.xbib.elx.http;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.Action;
@ -79,7 +80,7 @@ public class ExtendedHttpClient extends AbstractExtendedClient implements Elasti
clientBuilder.enableDebug();
}
this.nettyHttpClient = clientBuilder.build();
logger.info("extended HTTP client initialized, settings = {}, url = {}, {} actions",
logger.log(Level.DEBUG, "extended HTTP client initialized, settings = {}, url = {}, {} actions",
settings, url, actionMap.size());
return this;
}
@ -136,7 +137,7 @@ public class ExtendedHttpClient extends AbstractExtendedClient implements Elasti
@Override
public ThreadPool threadPool() {
logger.info("returning null for threadPool() request");
logger.log(Level.DEBUG, "returning null for threadPool() request");
return null;
}
@ -149,8 +150,8 @@ public class ExtendedHttpClient extends AbstractExtendedClient implements Elasti
}
try {
HttpActionContext httpActionContext = new HttpActionContext(this, request, url);
logger.log(Level.DEBUG, "url = " + url);
httpAction.execute(httpActionContext, listener);
logger.debug("submitted to URL {}", url);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}

View file

@ -3,8 +3,11 @@ package org.xbib.elx.http;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
@ -14,8 +17,11 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.RequestBuilder;
import org.xbib.netty.http.client.transport.Transport;
@ -33,7 +39,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
protected final Logger logger = LogManager.getLogger(getClass().getName());
protected static final String APPLICATION_JSON = "application/json";
private static final String APPLICATION_JSON = "application/json";
protected Settings settings;
@ -50,7 +56,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
return future;
}*/
public final void execute(HttpActionContext<R, T> httpActionContext, ActionListener<T> listener) throws IOException {
final void execute(HttpActionContext<R, T> httpActionContext, ActionListener<T> listener) throws IOException {
try {
ActionRequestValidationException validationException = httpActionContext.getRequest().validate();
if (validationException != null) {
@ -59,22 +65,29 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
}
RequestBuilder httpRequestBuilder =
createHttpRequest(httpActionContext.getUrl(), httpActionContext.getRequest());
//httpRequestBuilder.addHeader("content-type", "application/json");
Request httpRequest = httpRequestBuilder.build();
// logger.info("action = {} request = {}", this.getClass().getName(), httpRequest.toString());
logger.log(Level.DEBUG, "action = {} request = {}", this.getClass().getName(), httpRequest.toString());
httpRequest.setResponseListener(fullHttpResponse -> {
logger.info("returned response " + fullHttpResponse.status().code() +
logger.log(Level.DEBUG, "got response: " + fullHttpResponse.status().code() +
" headers = " + fullHttpResponse.headers().entries() +
" content = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8));
listener.onResponse(parseToResponse(httpActionContext.setHttpResponse(fullHttpResponse)));
httpActionContext.setHttpResponse(fullHttpResponse);
if (fullHttpResponse.status().equals(HttpResponseStatus.OK)) {
listener.onResponse(parseToResponse(httpActionContext));
} else {
ElasticsearchStatusException statusException = parseToError(httpActionContext);
if (statusException.status().equals(RestStatus.NOT_FOUND)) {
listener.onResponse(emptyResponse());
} else {
listener.onFailure(statusException);
}
}
});
Transport transport = httpActionContext.getExtendedHttpClient().internalClient().execute(httpRequest);
logger.info("transport = " + transport);
httpActionContext.setHttpClientTransport(transport);
if (transport.isFailed()) {
listener.onFailure(new Exception(transport.getFailure()));
}
logger.info("done, listener is " + listener);
} catch (Throwable e) {
listener.onFailure(new RuntimeException(e));
throw new IOException(e);
@ -82,7 +95,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
}
protected RequestBuilder newGetRequest(String url, String path) {
return Request.builder(HttpMethod.GET).url(url).uri(path);
return newRequest(HttpMethod.GET, url, path);
}
protected RequestBuilder newGetRequest(String url, String path, BytesReference content) {
@ -117,6 +130,10 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
return newRequest(HttpMethod.PUT, url, path, content);
}
protected RequestBuilder newDeleteRequest(String url, String path) {
return newRequest(HttpMethod.DELETE, url, path);
}
protected RequestBuilder newDeleteRequest(String url, String path, BytesReference content) {
return newRequest(HttpMethod.DELETE, url, path, content);
}
@ -146,7 +163,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
try (XContentParser parser = xContentType.xContent()
.createParser(httpActionContext.getExtendedHttpClient().getRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
httpActionContext.getHttpResponse().content().array())) {
httpActionContext.getHttpResponse().content().toString(StandardCharsets.UTF_8))) {
return entityParser().apply(parser);
} catch (IOException e) {
logger.error(e.getMessage(), e);
@ -154,8 +171,26 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
}
}
protected ElasticsearchStatusException parseToError(HttpActionContext<R, T> httpActionContext) {
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(httpActionContext.getExtendedHttpClient().getRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
httpActionContext.getHttpResponse().content().toString(StandardCharsets.UTF_8))) {
return errorParser().apply(parser);
} catch (IOException e) {
logger.error(e.getMessage(), e);
return new ElasticsearchStatusException(e.getMessage(), RestStatus.INTERNAL_SERVER_ERROR, e);
}
}
protected CheckedFunction<XContentParser, ElasticsearchStatusException, IOException> errorParser() {
return BytesRestResponse::errorFromXContent;
}
protected abstract RequestBuilder createHttpRequest(String baseUrl, R request) throws IOException;
protected abstract CheckedFunction<XContentParser, T, IOException> entityParser();
protected abstract T emptyResponse();
}

View file

@ -3,23 +3,12 @@ package org.xbib.elx.http.action.admin.cluster.health;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class HttpClusterHealthAction extends HttpAction<ClusterHealthRequest, ClusterHealthResponse> {
@ -30,106 +19,16 @@ public class HttpClusterHealthAction extends HttpAction<ClusterHealthRequest, Cl
@Override
protected RequestBuilder createHttpRequest(String url, ClusterHealthRequest request) {
return newPutRequest(url, "/_cluster/health");
return newGetRequest(url, "/_cluster/health");
}
@Override
protected CheckedFunction<XContentParser, ClusterHealthResponse, IOException> entityParser() {
throw new UnsupportedOperationException();
return HttpClusterHealthResponse::fromXContent;
}
private static final String CLUSTER_NAME = "cluster_name";
private static final String STATUS = "status";
private static final String TIMED_OUT = "timed_out";
private static final String NUMBER_OF_NODES = "number_of_nodes";
private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes";
private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks";
private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch";
private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE = "task_max_waiting_in_queue";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = "task_max_waiting_in_queue_millis";
private static final String ACTIVE_SHARDS_PERCENT_AS_NUMBER = "active_shards_percent_as_number";
private static final String ACTIVE_SHARDS_PERCENT = "active_shards_percent";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String INDICES = "indices";
private static final ConstructingObjectParser<ClusterHealthResponse, Void> PARSER =
new ConstructingObjectParser<>("cluster_health_response", true,
parsedObjects -> {
int i = 0;
// ClusterStateHealth fields
int numberOfNodes = (int) parsedObjects[i++];
int numberOfDataNodes = (int) parsedObjects[i++];
int activeShards = (int) parsedObjects[i++];
int relocatingShards = (int) parsedObjects[i++];
int activePrimaryShards = (int) parsedObjects[i++];
int initializingShards = (int) parsedObjects[i++];
int unassignedShards = (int) parsedObjects[i++];
double activeShardsPercent = (double) parsedObjects[i++];
String statusStr = (String) parsedObjects[i++];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
@SuppressWarnings("unchecked") List<ClusterIndexHealth> indexList =
(List<ClusterIndexHealth>) parsedObjects[i++];
final Map<String, ClusterIndexHealth> indices;
if (indexList == null || indexList.isEmpty()) {
indices = emptyMap();
} else {
indices = new HashMap<>(indexList.size());
for (ClusterIndexHealth indexHealth : indexList) {
indices.put(indexHealth.getIndex(), indexHealth);
}
}
/*ClusterStateHealth stateHealth = new ClusterStateHealth(activePrimaryShards, activeShards, relocatingShards,
initializingShards, unassignedShards, numberOfNodes, numberOfDataNodes, activeShardsPercent, status,
indices);*/
//ClusterState clusterState = new ClusterState();
//ClusterStateHealth clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
// ClusterHealthResponse fields
String clusterName = (String) parsedObjects[i++];
int numberOfPendingTasks = (int) parsedObjects[i++];
int numberOfInFlightFetch = (int) parsedObjects[i++];
int delayedUnassignedShards = (int) parsedObjects[i++];
long taskMaxWaitingTimeMillis = (long) parsedObjects[i++];
boolean timedOut = (boolean) parsedObjects[i];
return new ClusterHealthResponse(clusterName, null, null, numberOfPendingTasks,
numberOfInFlightFetch, delayedUnassignedShards,
TimeValue.timeValueMillis(taskMaxWaitingTimeMillis));
/*return new ClusterHealthResponse(clusterName, numberOfPendingTasks, numberOfInFlightFetch,
delayedUnassignedShards,
TimeValue.timeValueMillis(taskMaxWaitingTimeMillis), timedOut, stateHealth);*/
});
// private static final ObjectParser.NamedObjectParser<ClusterIndexHealth, Void> INDEX_PARSER =
// (XContentParser parser, Void context, String index) -> ClusterIndexHealth.innerFromXContent(parser, index);
static {
// ClusterStateHealth fields
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_NODES));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_DATA_NODES));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
PARSER.declareDouble(constructorArg(), new ParseField(ACTIVE_SHARDS_PERCENT_AS_NUMBER));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
// Can be absent if LEVEL == 'cluster'
//PARSER.declareNamedObjects(optionalConstructorArg(), INDEX_PARSER, new ParseField(INDICES));
// ClusterHealthResponse fields
PARSER.declareString(constructorArg(), new ParseField(CLUSTER_NAME));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_PENDING_TASKS));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_IN_FLIGHT_FETCH));
PARSER.declareInt(constructorArg(), new ParseField(DELAYED_UNASSIGNED_SHARDS));
PARSER.declareLong(constructorArg(), new ParseField(TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS));
PARSER.declareBoolean(constructorArg(), new ParseField(TIMED_OUT));
@Override
protected ClusterHealthResponse emptyResponse() {
return new HttpClusterHealthResponse();
}
}

View file

@ -0,0 +1,215 @@
package org.xbib.elx.http.action.admin.cluster.health;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class HttpClusterHealthResponse extends ClusterHealthResponse {
private static final String CLUSTER_NAME = "cluster_name";
private static final String STATUS = "status";
private static final String TIMED_OUT = "timed_out";
private static final String NUMBER_OF_NODES = "number_of_nodes";
private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes";
private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks";
private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch";
private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE = "task_max_waiting_in_queue";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = "task_max_waiting_in_queue_millis";
private static final String ACTIVE_SHARDS_PERCENT_AS_NUMBER = "active_shards_percent_as_number";
private static final String ACTIVE_SHARDS_PERCENT = "active_shards_percent";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String INDICES = "indices";
private String clusterName;
private ClusterStateHealth clusterStateHealth;
private boolean timedOut;
private int delayedUnassignedShards;
private int numberOfPendingTasks;
private int numberOfInFlightFetch;
public HttpClusterHealthResponse() {
}
private void init(String clusterName,
ClusterHealthStatus clusterHealthStatus,
boolean timedOut,
int numberOfNodes,
int numberOfDataNodes,
Map<String, ClusterIndexHealth> indices,
int activePrimaryShards,
int activeShards,
int relocatingShards,
int initializingShards,
int unassignedShards,
int delayedUnassignedShards,
int numberOfPendingTasks, int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime,
double activeShardsPercent) throws IOException {
this.clusterName = clusterName;
BytesStreamOutput streamOutput = new BytesStreamOutput();
streamOutput.writeVInt(activePrimaryShards);
streamOutput.writeVInt(activeShards);
streamOutput.writeVInt(relocatingShards);
streamOutput.writeVInt(initializingShards);
streamOutput.writeVInt(unassignedShards);
streamOutput.writeVInt(numberOfNodes);
streamOutput.writeVInt(numberOfDataNodes);
streamOutput.writeByte(clusterHealthStatus.value());
streamOutput.writeVInt(indices.size());
for (ClusterIndexHealth indexHealth : indices.values()) {
indexHealth.writeTo(streamOutput);
}
streamOutput.writeDouble(activeShardsPercent);
this.clusterStateHealth = new ClusterStateHealth(streamOutput.bytes().streamInput());
this.timedOut = timedOut;
this.delayedUnassignedShards = delayedUnassignedShards;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
}
@Override
public String getClusterName() {
return clusterName;
}
@Override
public ClusterStateHealth getClusterStateHealth() {
return clusterStateHealth;
}
@Override
public boolean isTimedOut() {
return this.timedOut;
}
@Override
public int getActiveShards() {
return clusterStateHealth.getActiveShards();
}
@Override
public int getRelocatingShards() {
return clusterStateHealth.getRelocatingShards();
}
@Override
public int getActivePrimaryShards() {
return clusterStateHealth.getActivePrimaryShards();
}
@Override
public int getInitializingShards() {
return clusterStateHealth.getInitializingShards();
}
@Override
public int getUnassignedShards() {
return clusterStateHealth.getUnassignedShards();
}
@Override
public int getDelayedUnassignedShards() {
return delayedUnassignedShards;
}
@Override
public int getNumberOfNodes() {
return clusterStateHealth.getNumberOfNodes();
}
@Override
public int getNumberOfDataNodes() {
return clusterStateHealth.getNumberOfDataNodes();
}
@Override
public int getNumberOfPendingTasks() {
return numberOfPendingTasks;
}
@Override
public int getNumberOfInFlightFetch() {
return numberOfInFlightFetch;
}
@Override
public ClusterHealthStatus getStatus() {
return clusterStateHealth.getStatus();
}
@Override
public Map<String, ClusterIndexHealth> getIndices() {
return clusterStateHealth.getIndices();
}
@Override
public double getActiveShardsPercent() {
return clusterStateHealth.getActiveShardsPercent();
}
@Override
public RestStatus status() {
return isTimedOut() ? RestStatus.REQUEST_TIMEOUT : RestStatus.OK;
}
public static HttpClusterHealthResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
Map<String, Object> map = parser.map();
String clusterName = (String) map.get(CLUSTER_NAME);
ClusterHealthStatus status = ClusterHealthStatus.fromString((String) map.get(STATUS));
Boolean timedOut = (Boolean) map.get(TIMED_OUT);
Integer numberOfNodes = (Integer) map.get(NUMBER_OF_NODES);
Integer numberOfDataNodes = (Integer) map.get(NUMBER_OF_DATA_NODES);
Integer activePrimaryShards = (Integer) map.get(ACTIVE_PRIMARY_SHARDS);
Integer activeShards = (Integer) map.get(ACTIVE_SHARDS);
Integer relocatingShards = (Integer) map.get(RELOCATING_SHARDS);
Integer initializingShards = (Integer) map.get(INITIALIZING_SHARDS);
Integer unassignedShards = (Integer) map.get(UNASSIGNED_SHARDS);
Integer delayedUnassignedShards = (Integer) map.get(DELAYED_UNASSIGNED_SHARDS);
Integer numberOfPendingTasks = (Integer) map.get(NUMBER_OF_PENDING_TASKS);
Integer numberOfInFlightFetch = (Integer) map.get(NUMBER_OF_IN_FLIGHT_FETCH);
Integer taskMaxWaitingInQueueMillis = (Integer) map.get(TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS);
Double activeShardsPercentAsNumber = (Double) map.get(ACTIVE_SHARDS_PERCENT_AS_NUMBER);
HttpClusterHealthResponse clusterHealthResponse = new HttpClusterHealthResponse();
clusterHealthResponse.init(clusterName,
status,
timedOut,
numberOfNodes,
numberOfDataNodes,
Collections.emptyMap(),
activePrimaryShards,
activeShards,
relocatingShards,
initializingShards,
unassignedShards,
delayedUnassignedShards,
numberOfPendingTasks,
numberOfInFlightFetch,
TimeValue.timeValueMillis(taskMaxWaitingInQueueMillis),
activeShardsPercentAsNumber
);
return clusterHealthResponse;
}
}

View file

@ -99,10 +99,14 @@ public class HttpNodesInfoAction extends HttpAction<NodesInfoRequest, NodesInfoR
throw new UnsupportedOperationException();
}
@Override
protected NodesInfoResponse emptyResponse() {
return new NodesInfoResponse();
}
@SuppressWarnings("unchecked")
protected NodesInfoResponse createResponse(HttpActionContext<NodesInfoRequest, NodesInfoResponse> httpContext) {
Map<String, Object> map = null;
String string = (String)map.get("cluster_name");
ClusterName clusterName = new ClusterName(string);
List<NodeInfo> nodeInfoList = new LinkedList<>();

View file

@ -43,4 +43,9 @@ public class HttpClusterUpdateSettingsAction extends HttpAction<ClusterUpdateSet
protected CheckedFunction<XContentParser, ClusterUpdateSettingsResponse, IOException> entityParser() {
return ClusterUpdateSettingsResponse::fromXContent;
}
@Override
protected ClusterUpdateSettingsResponse emptyResponse() {
return new ClusterUpdateSettingsResponse();
}
}

View file

@ -1,17 +1,47 @@
package org.xbib.elx.http.action.admin.cluster.state;
import com.carrotsearch.hppc.LongArrayList;
import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedObjectNotFoundException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, ClusterStateResponse> {
private static final String CLUSTER_NAME = "cluster_name";
private static final String COMPRESSED_SIZE_IN_BYTES = "compressed_size_in_bytes";
private static final String BLOCKS = "blocks";
private static final String NODES = "nodes";
private static final String METADATA = "metadata";
@Override
public ClusterStateAction getActionInstance() {
return ClusterStateAction.INSTANCE;
@ -19,11 +49,232 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
@Override
protected RequestBuilder createHttpRequest(String url, ClusterStateRequest request) {
return newPutRequest(url, "/_cluster/state");
List<String> list = new ArrayList<>();
if (request.metaData()) {
list.add("metadata");
}
if (request.blocks()) {
list.add("blocks");
}
if (request.nodes()) {
list.add("nodes");
}
if (request.routingTable()) {
list.add("routing_table");
}
if (request.customs()) {
list.add("customs");
}
if (list.isEmpty()) {
list.add("_all");
}
return newGetRequest(url, "/_cluster/state/" + String.join(",", list)
+ "/" + String.join(",", request.indices()));
}
@Override
protected CheckedFunction<XContentParser, ClusterStateResponse, IOException> entityParser() {
throw new UnsupportedOperationException();
return this::fromXContent;
}
@Override
protected ClusterStateResponse emptyResponse() {
return new ClusterStateResponse();
}
private ClusterStateResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
ClusterName clusterName = null;
ClusterState.Builder builder = null;
Integer length = null;
String currentFieldName = parser.currentName();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (CLUSTER_NAME.equals(currentFieldName)) {
clusterName = new ClusterName(parser.text());
builder = ClusterState.builder(clusterName);
}
if (COMPRESSED_SIZE_IN_BYTES.equals(currentFieldName)) {
length = parser.intValue();
}
}
if (METADATA.equals(currentFieldName)) {
// MetaData.fromXContent(parser) is broken because of "meta-data"
parser.nextToken();
builder.metaData(metadataFromXContent(parser));
}
}
ClusterState clusterState = builder.build();
return new ClusterStateResponse(clusterName, clusterState, length);
}
private MetaData metadataFromXContent(XContentParser parser) throws IOException {
MetaData.Builder builder = new MetaData.Builder();
XContentParser.Token token = parser.currentToken();
String currentFieldName = parser.currentName();
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Expected a START_OBJECT but got " + token);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("settings".equals(currentFieldName)) {
builder.persistentSettings(Settings.fromXContent(parser));
} else if ("indices".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
//IndexMetaData.Builder.fromXContent is broken
//builder.put(IndexMetaData.Builder.fromXContent(parser), false);
builder.put(indexMetaDataFromXContent(parser), false);
}
} else if ("templates".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
builder.put(IndexTemplateMetaData.Builder.fromXContent(parser, parser.currentName()));
}
} else if ("index-graveyard".equals(currentFieldName)) {
parser.skipChildren();
} else {
try {
MetaData.Custom custom = parser.namedObject(MetaData.Custom.class, currentFieldName, null);
builder.putCustom(custom.getWriteableName(), custom);
} catch (NamedObjectNotFoundException ex) {
logger.warn("Skipping unknown custom object with type {}", currentFieldName);
parser.skipChildren();
}
}
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
// private field
//builder.version = parser.longValue();
} else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) {
// private field
//builder.clusterUUID = parser.text();
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new IllegalArgumentException("Unexpected token " + token);
}
}
return builder.build();
}
private static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
private static final String KEY_VERSION = "version";
private static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
private static final String KEY_SETTINGS = "settings";
private static final String KEY_STATE = "state";
private static final String KEY_MAPPINGS = "mappings";
private static final String KEY_ALIASES = "aliases";
private static final String KEY_PRIMARY_TERMS = "primary_terms";
private IndexMetaData indexMetaDataFromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token
parser.nextToken();
}
if (parser.currentToken() != XContentParser.Token.FIELD_NAME) {
throw new IllegalArgumentException("expected field name but got a " + parser.currentToken());
}
IndexMetaData.Builder builder = new IndexMetaData.Builder(parser.currentName());
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected object but got a " + token);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (KEY_SETTINGS.equals(currentFieldName)) {
builder.settings(Settings.fromXContent(parser));
} else if (KEY_MAPPINGS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
Map<String, Object> mappingSource = MapBuilder.<String, Object>newMapBuilder().put(currentFieldName, parser.mapOrdered()).map();
builder.putMapping(new MappingMetaData(currentFieldName, mappingSource));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
} else if (KEY_ALIASES.equals(currentFieldName)) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
builder.putAlias(AliasMetaData.Builder.fromXContent(parser));
}
} else if (KEY_IN_SYNC_ALLOCATIONS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
Set<String> allocationIds = new HashSet<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
allocationIds.add(parser.text());
}
}
builder.putInSyncAllocationIds(Integer.valueOf(currentFieldName), allocationIds);
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
} else if (KEY_PRIMARY_TERMS.equals(currentFieldName)) {
parser.skipChildren(); // TODO
} else {
throw new IllegalArgumentException("Unexpected field for an object " + currentFieldName);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (KEY_MAPPINGS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
builder.putMapping(new MappingMetaData(new CompressedXContent(parser.binaryValue())));
} else {
Map<String, Object> mapping = parser.mapOrdered();
if (mapping.size() == 1) {
String mappingType = mapping.keySet().iterator().next();
builder.putMapping(new MappingMetaData(mappingType, mapping));
}
}
}
} else if (KEY_PRIMARY_TERMS.equals(currentFieldName)) {
LongArrayList list = new LongArrayList();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_NUMBER) {
list.add(parser.longValue());
} else {
throw new IllegalStateException("found a non-numeric value under [" + KEY_PRIMARY_TERMS + "]");
}
}
// private fiels
//builder.primaryTerms(list.toArray());
} else if (KEY_ALIASES.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
builder.putAlias(new AliasMetaData.Builder(parser.text()).build());
}
} else {
throw new IllegalArgumentException("Unexpected field for an array " + currentFieldName);
}
} else if (token.isValue()) {
if (KEY_STATE.equals(currentFieldName)) {
builder.state(IndexMetaData.State.fromString(parser.text()));
} else if (KEY_VERSION.equals(currentFieldName)) {
builder.version(parser.longValue());
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
builder.setRoutingNumShards(parser.intValue());
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new IllegalArgumentException("Unexpected token " + token);
}
}
return builder.build();
}
}

View file

@ -0,0 +1,48 @@
package org.xbib.elx.http.action.admin.indices.alias;
import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
public class HttpIndicesAliasesAction extends HttpAction<IndicesAliasesRequest, IndicesAliasesResponse> {
@Override
public IndicesAliasesAction getActionInstance() {
return IndicesAliasesAction.INSTANCE;
}
@Override
protected RequestBuilder createHttpRequest(String url, IndicesAliasesRequest request) {
try {
XContentBuilder builder = JsonXContent.contentBuilder();
request.toXContent(builder, ToXContent.EMPTY_PARAMS);
String body = Strings.toString(builder);
logger.log(Level.DEBUG, "body = " + body);
return newPostRequest(url, "/_aliases", body);
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
}
}
@Override
protected CheckedFunction<XContentParser, IndicesAliasesResponse, IOException> entityParser() {
return IndicesAliasesResponse::fromXContent;
}
@Override
protected IndicesAliasesResponse emptyResponse() {
return new IndicesAliasesResponse();
}
}

View file

@ -0,0 +1,87 @@
package org.xbib.elx.http.action.admin.indices.alias.get;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class HttpGetAliasAction extends HttpAction<GetAliasesRequest, GetAliasesResponse> {
@Override
protected RequestBuilder createHttpRequest(String url, GetAliasesRequest request) {
// beware of this inconsistency, request.indices() always return empty array
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
String aliases = request.aliases() != null ? String.join(",", request.aliases()) + "/" : "";
// do not add "/" in front of index
return newGetRequest(url, index + "_alias/" + aliases);
}
@Override
public GenericAction<GetAliasesRequest, GetAliasesResponse> getActionInstance() {
return GetAliasesAction.INSTANCE;
}
@Override
protected CheckedFunction<XContentParser, GetAliasesResponse, IOException> entityParser() {
return this::fromXContent;
}
@Override
protected GetAliasesResponse emptyResponse() {
ImmutableOpenMap.Builder<String, List<AliasMetaData>> aliasesBuilder = ImmutableOpenMap.builder();
return new GetAliasesResponse(aliasesBuilder.build());
}
private GetAliasesResponse fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) {
parser.nextToken();
}
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
ImmutableOpenMap.Builder<String, List<AliasMetaData>> aliasesBuilder = ImmutableOpenMap.builder();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String indexName = parser.currentName();
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
List<AliasMetaData> parseInside = parseAliases(parser);
aliasesBuilder.put(indexName, parseInside);
}
}
}
return new GetAliasesResponse(aliasesBuilder.build());
}
private static List<AliasMetaData> parseAliases(XContentParser parser) throws IOException {
List<AliasMetaData> aliases = new ArrayList<>();
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("aliases".equals(currentFieldName)) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
AliasMetaData fromXContent = AliasMetaData.Builder.fromXContent(parser);
aliases.add(fromXContent);
}
} else {
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_ARRAY) {
parser.skipChildren();
}
}
return aliases;
}
}

View file

@ -32,4 +32,9 @@ public class HttpCreateIndexAction extends HttpAction<CreateIndexRequest, Create
protected CheckedFunction<XContentParser, CreateIndexResponse, IOException> entityParser() {
return CreateIndexResponse::fromXContent;
}
@Override
protected CreateIndexResponse emptyResponse() {
return new CreateIndexResponse();
}
}

View file

@ -19,11 +19,16 @@ public class HttpDeleteIndexAction extends HttpAction<DeleteIndexRequest, Delete
@Override
protected RequestBuilder createHttpRequest(String url, DeleteIndexRequest deleteIndexRequest) {
return newPutRequest(url, "/" + String.join(",", deleteIndexRequest.indices()));
return newDeleteRequest(url, "/" + String.join(",", deleteIndexRequest.indices()));
}
@Override
protected CheckedFunction<XContentParser, DeleteIndexResponse, IOException> entityParser() {
return DeleteIndexResponse::fromXContent;
}
@Override
protected DeleteIndexResponse emptyResponse() {
return new DeleteIndexResponse();
}
}

View file

@ -0,0 +1,47 @@
package org.xbib.elx.http.action.admin.indices.exists.indices;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.xbib.elx.http.HttpAction;
import org.xbib.elx.http.HttpActionContext;
import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
public class HttpIndicesExistsAction extends HttpAction<IndicesExistsRequest, IndicesExistsResponse> {
@Override
public IndicesExistsAction getActionInstance() {
return IndicesExistsAction.INSTANCE;
}
@Override
protected RequestBuilder createHttpRequest(String url, IndicesExistsRequest request) {
String index = String.join(",", request.indices());
return newHeadRequest(url, index);
}
@Override
protected CheckedFunction<XContentParser, IndicesExistsResponse, IOException> entityParser() {
return this::fromXContent;
}
@Override
protected IndicesExistsResponse emptyResponse() {
return new IndicesExistsResponse(false); // used for 404 Not found
}
@Override
protected ElasticsearchStatusException parseToError(HttpActionContext<IndicesExistsRequest, IndicesExistsResponse> httpActionContext) {
return new ElasticsearchStatusException("not found", RestStatus.NOT_FOUND);
}
private IndicesExistsResponse fromXContent(XContentParser parser) throws IOException {
return new IndicesExistsResponse(true); // used for 200 OK
}
}

View file

@ -19,12 +19,17 @@ public class HttpRefreshIndexAction extends HttpAction<RefreshRequest, RefreshRe
@Override
protected RequestBuilder createHttpRequest(String url, RefreshRequest request) {
String index = request.indices() != null ? "/" + String.join(",", request.indices()) : "";
return newPostRequest(url, index + "/_refresh");
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
return newPostRequest(url, "/" + index + "_refresh");
}
@Override
protected CheckedFunction<XContentParser, RefreshResponse, IOException> entityParser() {
return RefreshResponse::fromXContent;
}
@Override
protected RefreshResponse emptyResponse() {
return new RefreshResponse();
}
}

View file

@ -0,0 +1,91 @@
package org.xbib.elx.http.action.admin.indices.settings.get;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class HttpGetSettingsAction extends HttpAction<GetSettingsRequest, GetSettingsResponse> {
@Override
public GetSettingsAction getActionInstance() {
return GetSettingsAction.INSTANCE;
}
@Override
protected RequestBuilder createHttpRequest(String url, GetSettingsRequest request) {
// beware, request.indices() is always an empty array
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
return newGetRequest(url, index + "_settings");
}
@Override
protected CheckedFunction<XContentParser, GetSettingsResponse, IOException> entityParser() {
return this::fromXContent;
}
@Override
protected GetSettingsResponse emptyResponse() {
ImmutableOpenMap<String, Settings> settingsMap = ImmutableOpenMap.<String, Settings>builder().build();
return new GetSettingsResponse(settingsMap);
}
private GetSettingsResponse fromXContent(XContentParser parser) throws IOException {
Map<String, Settings> indexToSettings = new HashMap<>();
Map<String, Settings> indexToDefaultSettings = new HashMap<>();
if (parser.currentToken() == null) {
parser.nextToken();
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
parser.nextToken();
while (!parser.isClosed()) {
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
parseIndexEntry(parser, indexToSettings, indexToDefaultSettings);
} else if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
parser.skipChildren();
} else {
parser.nextToken();
}
}
ImmutableOpenMap<String, Settings> settingsMap = ImmutableOpenMap.<String, Settings>builder().putAll(indexToSettings).build();
return new GetSettingsResponse(settingsMap);
}
private static void parseIndexEntry(XContentParser parser, Map<String, Settings> indexToSettings,
Map<String, Settings> indexToDefaultSettings) throws IOException {
String indexName = parser.currentName();
parser.nextToken();
while (!parser.isClosed() && parser.currentToken() != XContentParser.Token.END_OBJECT) {
parseSettingsField(parser, indexName, indexToSettings, indexToDefaultSettings);
}
}
private static void parseSettingsField(XContentParser parser, String currentIndexName, Map<String, Settings> indexToSettings,
Map<String, Settings> indexToDefaultSettings) throws IOException {
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
switch (parser.currentName()) {
case "settings":
indexToSettings.put(currentIndexName, Settings.fromXContent(parser));
break;
case "defaults":
indexToDefaultSettings.put(currentIndexName, Settings.fromXContent(parser));
break;
default:
parser.skipChildren();
}
} else if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
parser.skipChildren();
}
parser.nextToken();
}
}

View file

@ -15,7 +15,6 @@ import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
import java.io.UncheckedIOException;
public class HttpUpdateSettingsAction extends HttpAction<UpdateSettingsRequest, UpdateSettingsResponse> {
@Override
@ -30,8 +29,8 @@ public class HttpUpdateSettingsAction extends HttpAction<UpdateSettingsRequest,
builder.startObject();
request.settings().toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
String index = request.indices() != null ? "/" + String.join(",", request.indices()) : "";
return newPutRequest(url, index + "/_settings", BytesReference.bytes(builder));
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
return newPutRequest(url, "/" + index + "_settings", BytesReference.bytes(builder));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@ -41,4 +40,9 @@ public class HttpUpdateSettingsAction extends HttpAction<UpdateSettingsRequest,
protected CheckedFunction<XContentParser, UpdateSettingsResponse, IOException> entityParser() {
return UpdateSettingsResponse::fromXContent;
}
@Override
protected UpdateSettingsResponse emptyResponse() {
return new UpdateSettingsResponse();
}
}

View file

@ -2,12 +2,16 @@ package org.xbib.elx.http.action.bulk;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
@ -21,12 +25,12 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
}
@Override
protected RequestBuilder createHttpRequest(String url, BulkRequest request) {
protected RequestBuilder createHttpRequest(String url, BulkRequest request) throws IOException {
StringBuilder bulkContent = new StringBuilder();
for (DocWriteRequest<?> actionRequest : request.requests()) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
bulkContent.append("{\"").append(indexRequest.opType()).append("\":{");
bulkContent.append("{\"").append(indexRequest.opType().getLowercase()).append("\":{");
bulkContent.append("\"_index\":\"").append(indexRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append(indexRequest.type()).append("\"");
if (indexRequest.id() != null) {
@ -47,6 +51,31 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
bulkContent.append("}}\n");
bulkContent.append(indexRequest.source().utf8ToString());
bulkContent.append("\n");
} else if (actionRequest instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
bulkContent.append("{\"update\":{");
bulkContent.append("\"_index\":\"").append(updateRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append(updateRequest.type()).append("\"");
bulkContent.append(",\"_id\":\"").append(updateRequest.id()).append("\"");
if (updateRequest.routing() != null) {
bulkContent.append(",\"_routing\":\"").append(updateRequest.routing()).append("\"");
}
if (updateRequest.parent() != null) {
bulkContent.append(",\"_parent\":\"").append(updateRequest.parent()).append("\"");
}
if (updateRequest.version() > 0) {
bulkContent.append(",\"_version\":\"").append(updateRequest.version()).append("\"");
if (updateRequest.versionType() != null) {
bulkContent.append(",\"_version_type\":\"").append(updateRequest.versionType().name()).append("\"");
}
}
bulkContent.append("}}\n");
// only 'doc' supported
if (updateRequest.doc() != null) {
bulkContent.append("{\"doc\":");
bulkContent.append(XContentHelper.convertToJson(updateRequest.doc().source(), false, XContentType.JSON));
bulkContent.append("}\n");
}
} else if (actionRequest instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) actionRequest;
bulkContent.append("{\"delete\":{");
@ -66,4 +95,11 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
protected CheckedFunction<XContentParser, BulkResponse, IOException> entityParser() {
return BulkResponse::fromXContent;
}
@Override
protected BulkResponse emptyResponse() {
BulkItemResponse[] responses = null;
long took = 0L;
return new BulkResponse(responses, took);
}
}

View file

@ -11,8 +11,6 @@ import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
/**
*/
public class HttpExistsAction extends HttpAction<GetRequest, GetResponse> {
@Override
@ -22,11 +20,16 @@ public class HttpExistsAction extends HttpAction<GetRequest, GetResponse> {
@Override
protected RequestBuilder createHttpRequest(String url, GetRequest request) {
return newHeadRequest(url, request.index() + "/" + request.type() + "/" + request.id());
return newHeadRequest(url, "/" + request.index() + "/" + request.type() + "/" + request.id());
}
@Override
protected CheckedFunction<XContentParser, GetResponse, IOException> entityParser() {
return GetResponse::fromXContent;
}
@Override
protected GetResponse emptyResponse() {
return new GetResponse();
}
}

View file

@ -11,8 +11,6 @@ import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
/**
*/
public class HttpGetAction extends HttpAction<GetRequest, GetResponse> {
@Override
@ -22,11 +20,16 @@ public class HttpGetAction extends HttpAction<GetRequest, GetResponse> {
@Override
protected RequestBuilder createHttpRequest(String url, GetRequest request) {
return newGetRequest(url, request.index() + "/" + request.type() + "/" + request.id());
return newGetRequest(url, "/" + request.index() + "/" + request.type() + "/" + request.id());
}
@Override
protected CheckedFunction<XContentParser, GetResponse, IOException> entityParser() {
return GetResponse::fromXContent;
}
@Override
protected GetResponse emptyResponse() {
return new GetResponse();
}
}

View file

@ -11,8 +11,6 @@ import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
/**
*/
public class HttpIndexAction extends HttpAction<IndexRequest, IndexResponse> {
@Override
@ -22,7 +20,7 @@ public class HttpIndexAction extends HttpAction<IndexRequest, IndexResponse> {
@Override
protected RequestBuilder createHttpRequest(String url, IndexRequest request) {
return newPutRequest(url, request.index() + "/" + request.type() + "/" + request.id(),
return newPutRequest(url, "/" + request.index() + "/" + request.type() + "/" + request.id(),
request.source());
}
@ -30,4 +28,9 @@ public class HttpIndexAction extends HttpAction<IndexRequest, IndexResponse> {
protected CheckedFunction<XContentParser, IndexResponse, IOException> entityParser() {
return IndexResponse::fromXContent;
}
@Override
protected IndexResponse emptyResponse() {
return new IndexResponse();
}
}

View file

@ -27,4 +27,9 @@ public class HttpMainAction extends HttpAction<MainRequest, MainResponse> {
protected CheckedFunction<XContentParser, MainResponse, IOException> entityParser() {
return MainResponse::fromXContent;
}
@Override
protected MainResponse emptyResponse() {
return new MainResponse();
}
}

View file

@ -19,12 +19,18 @@ public class HttpSearchAction extends HttpAction<SearchRequest, SearchResponse>
@Override
protected RequestBuilder createHttpRequest(String url, SearchRequest request) {
String index = request.indices() != null ? "/" + String.join(",", request.indices()) : "";
return newPostRequest(url, index + "/_search", request.source().toString());
// request.indices() always empty array
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
return newPostRequest(url, index + "_search", request.source().toString());
}
@Override
protected CheckedFunction<XContentParser, SearchResponse, IOException> entityParser() {
return SearchResponse::fromXContent;
}
@Override
protected SearchResponse emptyResponse() {
return new SearchResponse();
}
}

View file

@ -15,8 +15,6 @@ import org.xbib.netty.http.client.RequestBuilder;
import java.io.IOException;
/**
*/
public class HttpUpdateAction extends HttpAction<UpdateRequest, UpdateResponse> {
@Override
@ -49,7 +47,7 @@ public class HttpUpdateAction extends HttpAction<UpdateRequest, UpdateResponse>
}
BytesReference source = XContentHelper.toXContent(updateRequest, xContentType, false);
return newPostRequest(url,
updateRequest.index() + "/" + updateRequest.type() + "/" + updateRequest.id() + "/_update",
"/" + updateRequest.index() + "/" + updateRequest.type() + "/" + updateRequest.id() + "/_update",
source);
} catch (IOException e) {
logger.error(e.getMessage(), e);
@ -61,4 +59,9 @@ public class HttpUpdateAction extends HttpAction<UpdateRequest, UpdateResponse>
protected CheckedFunction<XContentParser, UpdateResponse, IOException> entityParser() {
return UpdateResponse::fromXContent;
}
@Override
protected UpdateResponse emptyResponse() {
return new UpdateResponse();
}
}

View file

@ -1,8 +1,15 @@
org.elasticsearch.action.admin.indices.mapping.get.HttpGetMappingsAction
org.xbib.elx.http.action.admin.cluster.health.HttpClusterHealthAction
org.xbib.elx.http.action.admin.cluster.node.info.HttpNodesInfoAction
org.xbib.elx.http.action.admin.cluster.settings.HttpClusterUpdateSettingsAction
org.xbib.elx.http.action.admin.cluster.state.HttpClusterStateAction
org.xbib.elx.http.action.admin.indices.alias.HttpIndicesAliasesAction
org.xbib.elx.http.action.admin.indices.alias.get.HttpGetAliasAction
org.xbib.elx.http.action.admin.indices.create.HttpCreateIndexAction
org.xbib.elx.http.action.admin.indices.delete.HttpDeleteIndexAction
org.xbib.elx.http.action.admin.indices.exists.indices.HttpIndicesExistsAction
org.xbib.elx.http.action.admin.indices.refresh.HttpRefreshIndexAction
org.xbib.elx.http.action.admin.indices.settings.get.HttpGetSettingsAction
org.xbib.elx.http.action.admin.indices.settings.put.HttpUpdateSettingsAction
org.xbib.elx.http.action.bulk.HttpBulkAction
org.xbib.elx.http.action.index.HttpIndexAction

View file

@ -14,7 +14,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
@ -31,7 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled
@ExtendWith(TestExtension.class)
class ClientTest {

View file

@ -5,7 +5,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
@ -20,15 +19,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled
@ExtendWith(TestExtension.class)
class DuplicateIDTest {
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getSimpleName());
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
private static final Long ACTIONS = 12345L;
private static final Long ACTIONS = 50L;
private final TestExtension.Helper helper;

View file

@ -7,7 +7,6 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsReques
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexPruneResult;
@ -25,7 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled
@ExtendWith(TestExtension.class)
class IndexPruneTest {

View file

@ -1,5 +1,6 @@
package org.xbib.elx.http.test;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@ -21,7 +22,6 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled
@ExtendWith(TestExtension.class)
class IndexShiftTest {
@ -61,13 +61,16 @@ class IndexShiftTest {
assertTrue(indexShiftResult.getMovedAliases().isEmpty());
Map<String, String> aliases = client.getAliases("test1234");
logger.log(Level.DEBUG, "aliases = " + aliases);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
String resolved = client.resolveAlias("test");
logger.log(Level.DEBUG, "resolved = " + resolved);
aliases = client.getAliases(resolved);
logger.log(Level.DEBUG, "aliases = " + aliases);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));

View file

@ -4,7 +4,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.api.IndexDefinition;
@ -17,7 +16,6 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@Disabled
@ExtendWith(TestExtension.class)
class SmokeTest {
@ -36,16 +34,19 @@ class SmokeTest {
.put(helper.getHttpSettings())
.build();
try {
assertEquals(helper.getClusterName(), client.getClusterName());
client.newIndex("test");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
assertEquals(helper.getClusterName(), client.getClusterName());
client.checkMapping("test");
client.update("test", "1", "{ \"name\" : \"Another name\"}");
client.delete("test", "1");
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
client.waitForRecovery("test", 10L, TimeUnit.SECONDS);
client.delete("test", "1");
client.flush();
client.deleteIndex("test");
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test", Settings.builder()
.build());
@ -58,7 +59,7 @@ class SmokeTest {
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
assertEquals(0, client.getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
assertEquals(5, client.getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

View file

@ -6,7 +6,7 @@
</Console>
</appenders>
<Loggers>
<Root level="info">
<Root level="debug">
<AppenderRef ref="Console" />
</Root>
</Loggers>

View file

@ -27,7 +27,7 @@ class DuplicateIDTest {
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
private static final Long ACTIONS = 5L;
private static final Long ACTIONS = 50L;
private final TestExtension.Helper helper;

View file

@ -40,6 +40,9 @@ class SmokeTest {
client.delete("test", "1");
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
client.waitForRecovery("test", 10L, TimeUnit.SECONDS);
client.delete("test", "1");
client.flush();
client.checkMapping("test");
client.deleteIndex("test");
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test", Settings.builder()
@ -55,7 +58,7 @@ class SmokeTest {
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
assertEquals(0, client.getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
assertEquals(5, client.getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

View file

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 6.3.2.2
version = 6.3.2.3
profile = default
release = 0

View file

@ -1,5 +1,6 @@
#Mon May 06 11:25:14 CEST 2019
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.3-all.zip