adapting to ES 7.10.2

This commit is contained in:
Jörg Prante 2021-01-29 01:17:06 +01:00
parent 1a251acacf
commit a21e0aef5e
42 changed files with 258 additions and 343 deletions

View file

@ -1,3 +0,0 @@
language: java
jdk:
- openjdk11

View file

@ -99,9 +99,9 @@ public interface AdminClient extends BasicClient {
* Resolve alias.
*
* @param alias the alias
* @return this index name behind the alias or the alias if there is no index
* @return the index names behind the alias or an empty list if there is no such index
*/
String resolveAlias(String alias);
List<String> resolveAlias(String alias);
/**
* Resolve alias to all connected indices, sort index names with most recent timestamp on top, return this index

View file

@ -33,9 +33,10 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@ -64,7 +65,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@ -76,7 +76,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
@ -231,19 +230,30 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
@Override
public String resolveAlias(String alias) {
public List<String> resolveAlias(String alias) {
if (alias == null) {
return List.of();
}
ensureClientIsPresent();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.blocks(false);
clusterStateRequest.metaData(true);
clusterStateRequest.metadata(true);
clusterStateRequest.nodes(false);
clusterStateRequest.routingTable(false);
clusterStateRequest.customs(false);
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
SortedMap<String, AliasOrIndex> map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup();
AliasOrIndex aliasOrIndex = map.get(alias);
return aliasOrIndex != null ? aliasOrIndex.getIndices().iterator().next().getIndex().getName() : null;
IndexAbstraction indexAbstraction = clusterStateResponse.getState().getMetadata()
.getIndicesLookup().get(alias);
if (indexAbstraction == null) {
return List.of();
}
List<IndexMetadata> indexMetadata = indexAbstraction.getIndices();
if (indexMetadata == null) {
return List.of();
}
return indexMetadata.stream().map(im -> im.getIndex().getName())
.sorted().collect(Collectors.toList());
}
@Override
@ -283,8 +293,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
// two situations: 1. a new alias 2. there is already an old index with the alias
String oldIndex = resolveAlias(index);
Map<String, String> oldAliasMap = index.equals(oldIndex) ? null : getAliases(oldIndex);
List<String> oldIndices = resolveAlias(index);
String oldIndex = oldIndices.stream().findFirst().orElse(null);
Map<String, String> oldAliasMap = getAliases(oldIndex);
logger.debug("old index = {} old alias map = {}", oldIndex, oldAliasMap);
final List<String> newAliases = new ArrayList<>();
final List<String> moveAliases = new ArrayList<>();
@ -466,9 +477,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
String dateTimePattern = settings.get("dateTimePattern");
if (dateTimePattern != null) {
// check if index name with current date already exists, resolve to it
fullIndexName = resolveAlias(indexName + DateTimeFormatter.ofPattern(dateTimePattern)
.withZone(ZoneId.systemDefault()) // not GMT
.format(LocalDate.now()));
String dateAppendix = DateTimeFormatter.ofPattern(dateTimePattern)
.withZone(ZoneId.systemDefault()) // not GMT
.format(LocalDate.now());
fullIndexName = resolveAlias(indexName + dateAppendix).stream().findFirst().orElse(index);
} else {
// check if index name already exists, resolve to it
fullIndexName = resolveMostRecentIndex(indexName);
@ -510,12 +522,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
ensureClientIsPresent();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetadata>> map = getMappingsResponse.getMappings();
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
ImmutableOpenMap<String, MappingMetaData> mappings = map.get(stringObjectCursor.value);
for (ObjectObjectCursor<String, MappingMetaData> cursor : mappings) {
ImmutableOpenMap<String, MappingMetadata> mappings = map.get(stringObjectCursor.value);
for (ObjectObjectCursor<String, MappingMetadata> cursor : mappings) {
String mappingName = cursor.key;
MappingMetaData mappingMetaData = cursor.value;
MappingMetadata mappingMetaData = cursor.value;
checkMapping(index, mappingName, mappingMetaData);
}
});
@ -568,21 +580,20 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
private Map<String, String> getFilters(GetAliasesResponse getAliasesResponse) {
Map<String, String> result = new HashMap<>();
for (ObjectObjectCursor<String, List<AliasMetaData>> object : getAliasesResponse.getAliases()) {
List<AliasMetaData> aliasMetaDataList = object.value;
for (AliasMetaData aliasMetaData : aliasMetaDataList) {
if (aliasMetaData.filteringRequired()) {
result.put(aliasMetaData.alias(),
new String(aliasMetaData.getFilter().uncompressed(), StandardCharsets.UTF_8));
for (ObjectObjectCursor<String, List<AliasMetadata>> object : getAliasesResponse.getAliases()) {
List<AliasMetadata> aliasMetadataList = object.value;
for (AliasMetadata aliasMetadata : aliasMetadataList) {
if (aliasMetadata.filteringRequired()) {
result.put(aliasMetadata.alias(),aliasMetadata.getFilter().string());
} else {
result.put(aliasMetaData.alias(), null);
result.put(aliasMetadata.alias(), null);
}
}
}
return result;
}
private void checkMapping(String index, String type, MappingMetaData mappingMetaData) {
private void checkMapping(String index, String type, MappingMetadata mappingMetaData) {
try {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(index)

View file

@ -44,7 +44,6 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override
public void setClient(ElasticsearchClient client) {
logger.log(Level.INFO, "setting client = " + client);
this.client = client;
}
@ -56,7 +55,7 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override
public void init(Settings settings) throws IOException {
if (closed.compareAndSet(false, true)) {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
logger.log(Level.DEBUG, "initializing with settings = " + settings.toDelimitedString(','));
this.settings = settings;
setClient(createClient(settings));
} else {
@ -102,7 +101,6 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent();
logger.info("waiting for cluster shard settling");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.waitForNoInitializingShards(true)

View file

@ -70,16 +70,16 @@ public class DefaultBulkController implements BulkController {
public void init(Settings settings) {
bulkMetric.init(settings);
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(),
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum());
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.asInteger());
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(),
Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum());
Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.asInteger());
TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(),
TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum()));
TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.asInteger()));
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.asString(),
"maxVolumePerRequest"));
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
Parameters.ENABLE_BULK_LOGGING.getValue());
Parameters.ENABLE_BULK_LOGGING.asBool());
BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest)

View file

@ -20,11 +20,11 @@ public enum Parameters {
FLUSH_INTERVAL("flush_interval");
boolean flag;
private boolean flag = false;
int num;
private int num = -1;
String string;
private String string;
Parameters(boolean flag) {
this.flag = flag;
@ -38,15 +38,15 @@ public enum Parameters {
this.string = string;
}
boolean getValue() {
public boolean asBool() {
return flag;
}
int getNum() {
public int asInteger() {
return num;
}
String getString() {
public String asString() {
return string;
}
}

View file

@ -84,9 +84,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName());
NodesInfoResponse response = helper.client("1").execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
TransportAddress address = response.getNodes().get(0).getTransport().getAddress().publishAddress();
TransportAddress address = response.getNodes().get(0).getNode().getAddress();
String host = address.address().getHostName();
int port = address.address().getPort();
try {

View file

@ -9,5 +9,9 @@
<Root level="info">
<AppenderRef ref="Console" />
</Root>
<!-- Remove WARN on elastic embedded because dangling indices cannot be detected -->
<Logger name="org.elasticsearch.gateway.DanglingIndicesState" level="error" />
<!-- Remove WARN on elastic embedded because node and cluster identifiers change between test suites -->
<Logger name="org.elasticsearch.cluster.service.ClusterApplierService" level="error" />
</Loggers>
</configuration>

View file

@ -5,9 +5,5 @@ dependencies{
api "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
api "org.xbib:netty-http-client:${project.property('xbib-netty-http.version')}"
runtimeOnly "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
if (Os.isFamily(Os.FAMILY_MAC)) {
runtimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}"
} else if (Os.isFamily(Os.FAMILY_UNIX)) {
runtimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
}
runtimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
}

View file

@ -23,8 +23,8 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.xbib.net.URL;
import org.xbib.netty.http.client.api.ClientTransport;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.client.api.Transport;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
@ -78,7 +78,10 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
}
}
});
Transport transport = httpActionContext.getExtendedHttpClient().internalClient().execute(httpRequest);
if (logger.isDebugEnabled()) {
logger.log(Level.DEBUG, "executing HTTP request " + httpRequest);
}
ClientTransport transport = httpActionContext.getExtendedHttpClient().internalClient().execute(httpRequest);
httpActionContext.setHttpClientTransport(transport);
if (transport.isFailed()) {
listener.onFailure(new Exception(transport.getFailure()));

View file

@ -2,7 +2,7 @@ package org.xbib.elx.http;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.xbib.netty.http.client.api.Transport;
import org.xbib.netty.http.client.api.ClientTransport;
import org.xbib.netty.http.common.HttpResponse;
/**
@ -19,7 +19,7 @@ public class HttpActionContext<R extends ActionRequest, T extends ActionResponse
private final String url;
private Transport httpClientTransport;
private ClientTransport httpClientTransport;
private HttpResponse httpResponse;
@ -41,11 +41,11 @@ public class HttpActionContext<R extends ActionRequest, T extends ActionResponse
return url;
}
public void setHttpClientTransport(Transport httpClientTransport) {
public void setHttpClientTransport(ClientTransport httpClientTransport) {
this.httpClientTransport = httpClientTransport;
}
public Transport getHttpClientTransport() {
public ClientTransport getHttpClientTransport() {
return httpClientTransport;
}

View file

@ -1,98 +0,0 @@
package org.xbib.elx.http;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.xbib.netty.http.client.api.Transport;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class HttpActionFuture<T, L> extends BaseFuture<T> implements ActionFuture<T>, ActionListener<L> {
private Transport httpClientTransport;
HttpActionFuture<T, L> setHttpClientTransport(Transport httpClientTransport) {
this.httpClientTransport = httpClientTransport;
return this;
}
@Override
public T actionGet() {
try {
httpClientTransport.get();
return get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("future got interrupted", e);
} catch (ExecutionException e) {
throw rethrowExecutionException(e);
}
}
@Override
public T actionGet(String timeout) {
return actionGet(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".actionGet.timeout"));
}
@Override
public T actionGet(long timeoutMillis) {
return actionGet(timeoutMillis, TimeUnit.MILLISECONDS);
}
@Override
public T actionGet(TimeValue timeout) {
return actionGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
@Override
public T actionGet(long timeout, TimeUnit unit) {
try {
return get(timeout, unit);
} catch (TimeoutException e) {
throw new ElasticsearchTimeoutException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw rethrowExecutionException(e);
}
}
private static RuntimeException rethrowExecutionException(ExecutionException e) {
if (e.getCause() instanceof ElasticsearchException) {
ElasticsearchException esEx = (ElasticsearchException) e.getCause();
Throwable root = esEx.unwrapCause();
if (root instanceof ElasticsearchException) {
return (ElasticsearchException) root;
} else if (root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
} else if (e.getCause() instanceof RuntimeException) {
return (RuntimeException) e.getCause();
} else {
return new UncategorizedExecutionException("Failed execution", e);
}
}
@Override
public void onResponse(L result) {
set(convert(result));
}
@Override
public void onFailure(Exception e) {
setException(e);
}
@SuppressWarnings("unchecked")
private T convert(L listenerResponse) {
return (T) listenerResponse;
}
}

View file

@ -29,7 +29,7 @@ public class HttpBulkClient extends AbstractBulkClient implements ElasticsearchC
}
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
protected ElasticsearchClient createClient(Settings settings) {
return this;
}

View file

@ -42,28 +42,28 @@ public class HttpNodesInfoAction extends HttpAction<NodesInfoRequest, NodesInfoR
path.append("/_all");
}
List<String> metrics = new LinkedList<>();
if (request.http()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.HTTP.metricName())) {
metrics.add("http");
}
if (request.jvm()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.JVM.metricName())) {
metrics.add("jvm");
}
if (request.os()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.OS.metricName())) {
metrics.add("os");
}
if (request.plugins()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.PLUGINS.metricName())) {
metrics.add("plugins");
}
if (request.process()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.PROCESS.metricName())) {
metrics.add("process");
}
if (request.settings()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.SETTINGS.metricName())) {
metrics.add("settings");
}
if (request.threadPool()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.THREAD_POOL.metricName())) {
metrics.add("thread_pool");
}
if (request.transport()) {
if (request.requestedMetrics().contains(NodesInfoRequest.Metric.TRANSPORT.metricName())) {
metrics.add("transport");
}
if (!metrics.isEmpty()) {

View file

@ -1,16 +1,20 @@
package org.xbib.elx.http.action.admin.cluster.state;
import com.carrotsearch.hppc.LongArrayList;
import org.apache.logging.log4j.Level;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedXContent;
@ -36,10 +40,6 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
private static final String COMPRESSED_SIZE_IN_BYTES = "compressed_size_in_bytes";
private static final String BLOCKS = "blocks";
private static final String NODES = "nodes";
private static final String METADATA = "metadata";
@Override
@ -50,7 +50,7 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
@Override
protected Request.Builder createHttpRequest(String url, ClusterStateRequest request) {
List<String> list = new ArrayList<>();
if (request.metaData()) {
if (request.metadata()) {
list.add("metadata");
}
if (request.blocks()) {
@ -79,10 +79,9 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
private ClusterStateResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
ClusterName clusterName = null;
ClusterState.Builder builder = null;
Integer length = null;
ClusterState.Builder builder = ClusterState.builder(new ClusterName(""));
String currentFieldName = parser.currentName();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -93,22 +92,22 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
builder = ClusterState.builder(clusterName);
}
if (COMPRESSED_SIZE_IN_BYTES.equals(currentFieldName)) {
length = parser.intValue();
parser.intValue();
}
}
if (METADATA.equals(currentFieldName)) {
// MetaData.fromXContent(parser) is broken because of "meta-data"
parser.nextToken();
builder.metaData(metadataFromXContent(parser));
// MetaData.fromXContent(parser) is broken (Expected [meta-data] as a field name but got cluster_uuid)
// so we have to replace it
builder.metadata(metadataFromXContent(parser));
}
}
ClusterState clusterState = builder.build();
return new ClusterStateResponse(clusterName, clusterState, true);
}
private MetaData metadataFromXContent(XContentParser parser) throws IOException {
MetaData.Builder builder = new MetaData.Builder();
private Metadata metadataFromXContent(XContentParser parser) throws IOException {
Metadata.Builder builder = new Metadata.Builder();
XContentParser.Token token = parser.currentToken();
String currentFieldName = parser.currentName();
if (token != XContentParser.Token.START_OBJECT) {
@ -118,23 +117,26 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("settings".equals(currentFieldName)) {
if ("cluster_coordination".equals(currentFieldName)) {
builder.coordinationMetadata(CoordinationMetadata.fromXContent(parser));
} else if ("settings".equals(currentFieldName)) {
builder.persistentSettings(Settings.fromXContent(parser));
} else if ("indices".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
//IndexMetaData.Builder.fromXContent is broken
//builder.put(IndexMetaData.Builder.fromXContent(parser), false);
builder.put(indexMetaDataFromXContent(parser), false);
// builder.put(IndexMetadata.Builder.fromXContent(parser), false);
}
} else if ("hashes_of_consistent_settings".equals(currentFieldName)) {
builder.hashesOfConsistentSettings(parser.mapStrings());
} else if ("templates".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
builder.put(IndexTemplateMetaData.Builder.fromXContent(parser, parser.currentName()));
builder.put(IndexTemplateMetadata.Builder.fromXContent(parser, parser.currentName()));
}
} else if ("index-graveyard".equals(currentFieldName)) {
parser.skipChildren();
} else {
try {
MetaData.Custom custom = parser.namedObject(MetaData.Custom.class, currentFieldName, null);
Metadata.Custom custom = parser.namedObject(Metadata.Custom.class, currentFieldName, null);
builder.putCustom(custom.getWriteableName(), custom);
} catch (NamedObjectNotFoundException ex) {
logger.warn("Skipping unknown custom object with type {}", currentFieldName);
@ -144,10 +146,10 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
// private field
//builder.version = parser.longValue();
} else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) {
// private field
//builder.clusterUUID = parser.text();
} else if ("cluster_uuid_committed".equals(currentFieldName)) {
// skip
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
@ -160,29 +162,37 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
private static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
private static final String KEY_VERSION = "version";
private static final String KEY_MAPPING_VERSION = "mapping_version";
private static final String KEY_SETTINGS_VERSION = "settings_version";
private static final String KEY_ALIASES_VERSION = "aliases_version";
private static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
private static final String KEY_ROLLOVER_INFOS = "rollover_info";
private static final String KEY_SYSTEM = "system";
private static final String KEY_SETTINGS = "settings";
private static final String KEY_STATE = "state";
private static final String KEY_MAPPINGS = "mappings";
private static final String KEY_ALIASES = "aliases";
private static final String KEY_PRIMARY_TERMS = "primary_terms";
private IndexMetaData indexMetaDataFromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) { // fresh parser? move to the first token
private IndexMetadata indexMetaDataFromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) {
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
parser.nextToken();
}
if (parser.currentToken() != XContentParser.Token.FIELD_NAME) {
throw new IllegalArgumentException("expected field name but got a " + parser.currentToken());
}
IndexMetaData.Builder builder = new IndexMetaData.Builder(parser.currentName());
IndexMetadata.Builder builder = new IndexMetadata.Builder(parser.currentName());
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected object but got a " + token);
}
boolean mappingVersion = false;
boolean settingsVersion = false;
boolean aliasesVersion = false;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -194,15 +204,16 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
Map<String, Object> mappingSource = MapBuilder.<String, Object>newMapBuilder().put(currentFieldName, parser.mapOrdered()).map();
builder.putMapping(new MappingMetaData(currentFieldName, mappingSource));
Map<String, Object> mappingSource =
MapBuilder.<String, Object>newMapBuilder().put(currentFieldName, parser.mapOrdered()).map();
builder.putMapping(new MappingMetadata(currentFieldName, mappingSource));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
} else if (KEY_ALIASES.equals(currentFieldName)) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
builder.putAlias(AliasMetaData.Builder.fromXContent(parser));
builder.putAlias(AliasMetadata.Builder.fromXContent(parser));
}
} else if (KEY_IN_SYNC_ALLOCATIONS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -215,13 +226,30 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
allocationIds.add(parser.text());
}
}
builder.putInSyncAllocationIds(Integer.valueOf(currentFieldName), allocationIds);
builder.putInSyncAllocationIds(Integer.parseInt(currentFieldName), allocationIds);
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
} else if (KEY_PRIMARY_TERMS.equals(currentFieldName)) {
parser.skipChildren(); // TODO
parser.skipChildren();
} else if (KEY_ROLLOVER_INFOS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
builder.putRolloverInfo(RolloverInfo.parse(parser, currentFieldName));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
} else if ("warmers".equals(currentFieldName)) {
// TODO: do this in 6.0:
// throw new IllegalArgumentException("Warmers are not supported anymore - are you upgrading from 1.x?");
// ignore: warmers have been removed in 5.0 and are
// simply ignored when upgrading from 2.x
assert Version.CURRENT.major <= 5;
parser.skipChildren();
} else {
throw new IllegalArgumentException("Unexpected field for an object " + currentFieldName);
}
@ -229,12 +257,12 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
if (KEY_MAPPINGS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
builder.putMapping(new MappingMetaData(new CompressedXContent(parser.binaryValue())));
builder.putMapping(new MappingMetadata(new CompressedXContent(parser.binaryValue())));
} else {
Map<String, Object> mapping = parser.mapOrdered();
if (mapping.size() == 1) {
String mappingType = mapping.keySet().iterator().next();
builder.putMapping(new MappingMetaData(mappingType, mapping));
builder.putMapping(new MappingMetadata(mappingType, mapping));
}
}
}
@ -247,22 +275,31 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
throw new IllegalStateException("found a non-numeric value under [" + KEY_PRIMARY_TERMS + "]");
}
}
// private fiels
//builder.primaryTerms(list.toArray());
} else if (KEY_ALIASES.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
builder.putAlias(new AliasMetaData.Builder(parser.text()).build());
builder.putAlias(new AliasMetadata.Builder(parser.text()).build());
}
} else {
throw new IllegalArgumentException("Unexpected field for an array " + currentFieldName);
}
} else if (token.isValue()) {
if (KEY_STATE.equals(currentFieldName)) {
builder.state(IndexMetaData.State.fromString(parser.text()));
builder.state(IndexMetadata.State.fromString(parser.text()));
} else if (KEY_VERSION.equals(currentFieldName)) {
builder.version(parser.longValue());
} else if (KEY_MAPPING_VERSION.equals(currentFieldName)) {
mappingVersion = true;
builder.mappingVersion(parser.longValue());
} else if (KEY_SETTINGS_VERSION.equals(currentFieldName)) {
settingsVersion = true;
builder.settingsVersion(parser.longValue());
} else if (KEY_ALIASES_VERSION.equals(currentFieldName)) {
aliasesVersion = true;
builder.aliasesVersion(parser.longValue());
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
builder.setRoutingNumShards(parser.intValue());
} else if (KEY_SYSTEM.equals(currentFieldName)) {
builder.system(parser.booleanValue());
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}

View file

@ -4,7 +4,7 @@ import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentParser;
@ -43,13 +43,13 @@ public class HttpGetAliasAction extends HttpAction<GetAliasesRequest, GetAliases
if (parser.currentToken() == null) {
parser.nextToken();
}
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
ImmutableOpenMap.Builder<String, List<AliasMetaData>> aliasesBuilder = ImmutableOpenMap.builder();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
ImmutableOpenMap.Builder<String, List<AliasMetadata>> aliasesBuilder = ImmutableOpenMap.builder();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String indexName = parser.currentName();
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
List<AliasMetaData> parseInside = parseAliases(parser);
List<AliasMetadata> parseInside = parseAliases(parser);
aliasesBuilder.put(indexName, parseInside);
}
}
@ -57,8 +57,8 @@ public class HttpGetAliasAction extends HttpAction<GetAliasesRequest, GetAliases
return new GetAliasesResponse(aliasesBuilder.build());
}
private static List<AliasMetaData> parseAliases(XContentParser parser) throws IOException {
List<AliasMetaData> aliases = new ArrayList<>();
private static List<AliasMetadata> parseAliases(XContentParser parser) throws IOException {
List<AliasMetadata> aliases = new ArrayList<>();
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -67,7 +67,7 @@ public class HttpGetAliasAction extends HttpAction<GetAliasesRequest, GetAliases
} else if (token == XContentParser.Token.START_OBJECT) {
if ("aliases".equals(currentFieldName)) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
AliasMetaData fromXContent = AliasMetaData.Builder.fromXContent(parser);
AliasMetadata fromXContent = AliasMetadata.Builder.fromXContent(parser);
aliases.add(fromXContent);
}
} else {

View file

@ -1,32 +1,22 @@
package org.xbib.elx.http.action.admin.indices.create;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.util.Map;
public class HttpCreateIndexAction extends HttpAction<CreateIndexRequest, CreateIndexResponse> {
public static final ParseField MAPPINGS = new ParseField("mappings");
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField ALIASES = new ParseField("aliases");
@Override
public CreateIndexAction getActionInstance() {
return CreateIndexAction.INSTANCE;
@ -34,7 +24,7 @@ public class HttpCreateIndexAction extends HttpAction<CreateIndexRequest, Create
@Override
protected Request.Builder createHttpRequest(String url, CreateIndexRequest createIndexRequest) throws IOException {
XContentBuilder builder = toXContent(createIndexRequest, XContentFactory.jsonBuilder());
XContentBuilder builder = createIndexRequest.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS);
return newPutRequest(url, "/" + createIndexRequest.index() + "?include_type_name=true",
BytesReference.bytes(builder));
}
@ -43,31 +33,4 @@ public class HttpCreateIndexAction extends HttpAction<CreateIndexRequest, Create
protected CheckedFunction<XContentParser, CreateIndexResponse, IOException> entityParser(HttpResponse httpResponse) {
return CreateIndexResponse::fromXContent;
}
// fixed version from CreateIndexRequest - use only one mapping
private XContentBuilder toXContent(CreateIndexRequest createIndexRequest,
XContentBuilder builder) throws IOException {
builder.startObject();
builder.startObject(SETTINGS.getPreferredName());
createIndexRequest.settings().toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
// there is only an empty or a single entry for mappings
if (createIndexRequest.mappings().isEmpty()) {
// ES wants a mappings element with an empty map
builder.startObject(MAPPINGS.getPreferredName());
builder.endObject();
} else {
Map<String, ?> mappingAsMap = createIndexRequest.mappings();
String mappingString = mappingAsMap.values().iterator().next().toString();
builder.field(MAPPINGS.getPreferredName());
builder.map(XContentHelper.convertToMap(new BytesArray(mappingString), false, XContentType.JSON).v2());
}
builder.startObject(ALIASES.getPreferredName());
for (Alias alias : createIndexRequest.aliases()) {
alias.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endObject();
builder.endObject();
return builder;
}
}

View file

@ -3,7 +3,7 @@ package org.xbib.elx.http.action.admin.indices.mapping.get;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -41,12 +41,12 @@ public class HttpGetMappingsAction extends HttpAction<GetMappingsRequest, GetMap
parser.nextToken();
}
Map<String, Object> map = parser.map();
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> builder = new ImmutableOpenMap.Builder<>();
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetadata>> builder = new ImmutableOpenMap.Builder<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
final String indexName = entry.getKey();
final Map<String, Object> mapping = (Map<String, Object>) ((Map<String, Object>) entry.getValue()).get(MAPPINGS.getPreferredName());
ImmutableOpenMap.Builder<String, MappingMetaData> typeBuilder = new ImmutableOpenMap.Builder<>();
MappingMetaData mmd = new MappingMetaData("_doc", mapping);
ImmutableOpenMap.Builder<String, MappingMetadata> typeBuilder = new ImmutableOpenMap.Builder<>();
MappingMetadata mmd = new MappingMetadata("_doc", mapping);
typeBuilder.put("_doc", mmd);
builder.put(indexName, typeBuilder.build());
}

View file

@ -2,7 +2,6 @@ package org.xbib.elx.http.action.bulk;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -33,7 +32,6 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
IndexRequest indexRequest = (IndexRequest) actionRequest;
bulkContent.append("{\"").append(indexRequest.opType().getLowercase()).append("\":{");
bulkContent.append("\"_index\":\"").append(indexRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append("_doc").append("\"");
if (indexRequest.id() != null) {
bulkContent.append(",\"_id\":\"").append(indexRequest.id()).append("\"");
}
@ -53,7 +51,6 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
bulkContent.append("{\"update\":{");
bulkContent.append("\"_index\":\"").append(updateRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append("_doc").append("\"");
bulkContent.append(",\"_id\":\"").append(updateRequest.id()).append("\"");
if (updateRequest.routing() != null) {
bulkContent.append(",\"_routing\":\"").append(updateRequest.routing()).append("\"");
@ -75,7 +72,6 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
DeleteRequest deleteRequest = (DeleteRequest) actionRequest;
bulkContent.append("{\"delete\":{");
bulkContent.append("\"_index\":\"").append(deleteRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append("_doc").append("\"");
bulkContent.append(",\"_id\":\"").append(deleteRequest.id()).append("\"");
if (deleteRequest.routing() != null) {
bulkContent.append(",\"_routing\":\"").append(deleteRequest.routing()).append("\""); // _routing

View file

@ -21,7 +21,7 @@ public class HttpGetAction extends HttpAction<GetRequest, GetResponse> {
@Override
protected Request.Builder createHttpRequest(String url, GetRequest request) {
return newGetRequest(url, "/" + request.index() + "/_doc/" + "/" + request.id());
return newGetRequest(url, "/" + request.index() + "/_doc/" + request.id());
}
@Override

View file

@ -1,6 +1,7 @@
package org.xbib.elx.http.action.index;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
@ -21,7 +22,8 @@ public class HttpIndexAction extends HttpAction<IndexRequest, IndexResponse> {
@Override
protected Request.Builder createHttpRequest(String url, IndexRequest request) {
return newPutRequest(url, "/" + request.index() + "/_doc/" + request.id(),
String optype = request.opType() == DocWriteRequest.OpType.CREATE ? "_create" : "_doc";
return newPutRequest(url, "/" + request.index() + "/" + optype + "/" + request.id(),
request.source());
}

View file

@ -19,6 +19,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -67,7 +68,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
String resolved = adminClient.resolveAlias("test");
String resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null);
logger.log(Level.DEBUG, "resolved = " + resolved);
aliases = adminClient.getAliases(resolved);
logger.log(Level.DEBUG, "aliases = " + aliases);
@ -99,7 +100,8 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = adminClient.resolveAlias("test");
resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null);
assertNotNull(resolved);
aliases = adminClient.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));

View file

@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
@ -15,6 +16,7 @@ import org.xbib.elx.http.HttpBulkClient;
import org.xbib.elx.http.HttpBulkClientProvider;
import org.xbib.elx.http.HttpSearchClient;
import org.xbib.elx.http.HttpSearchClientProvider;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
@ -52,8 +54,10 @@ class SearchTest {
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
bulkClient.refreshIndex("test");
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}");
bulkClient.flush();
}
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -62,12 +66,14 @@ class SearchTest {
.setSearchClientProvider(HttpSearchClientProvider.class)
.put(helper.getHttpSettings())
.build()) {
Optional<GetResponse> responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0"));
assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString());
Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()),
TimeValue.timeValueMinutes(1), 10);
long count = stream.count();
assertEquals(numactions, count);
assertEquals(numactions + 1, count);
Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()));
@ -76,9 +82,9 @@ class SearchTest {
logger.info(id);
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get());
assertEquals(13, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(numactions + 1, idcount.get());
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(3, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
}

View file

@ -1,5 +1,6 @@
package org.xbib.elx.http.test;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
@ -18,6 +19,7 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
@ -84,11 +86,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest()
.clear()
.addMetric(NodesInfoRequest.Metric.HTTP.metricName());
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
TransportAddress address = response.getNodes().get(0).getHttp().getAddress().publishAddress();
TransportAddress address = response.getNodes().get(0).getInfo(HttpInfo.class).getAddress().publishAddress();
helper.httpHost = address.address().getHostName();
helper.httpPort = address.address().getPort();
logger.log(Level.INFO, "http host = " + helper.httpHost + " port = " + helper.httpPort);
try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
@ -191,8 +196,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("cluster.initial_master_nodes", "1")
.put("discovery.seed_hosts", "127.0.0.1:9300")
//.put("cluster.initial_master_nodes", "1")
//.put("discovery.seed_hosts", "127.0.0.1:9300")
.build();
}
@ -226,6 +231,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
.put("node.name", id)
.put("path.data", getHome() + "/data-" + id)
.build();
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins);

View file

@ -9,5 +9,9 @@
<Root level="info">
<AppenderRef ref="Console" />
</Root>
<!-- Remove WARN on elastic embedded because dangling indices cannot be detected -->
<Logger name="org.elasticsearch.gateway.DanglingIndicesState" level="error" />
<!-- Remove WARN on elastic embedded because node and cluster identifiers change between test suites -->
<Logger name="org.elasticsearch.cluster.service.ClusterApplierService" level="error" />
</Loggers>
</configuration>
</configuration>

View file

@ -4,7 +4,6 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -58,6 +57,8 @@ public class NodeClientHelper {
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder()
.put(settings.filter(key -> !isPrivateSettings(key)))
// We have to keep the legacy settings. This means a lot of noise in the log files abut deprecation and failed handshaking.
// Clearly, ES wants to annoy users of embedded master-less, data-less nodes.
.put("node.master", false)
.put("node.data", false)
// "node.processors"

View file

@ -43,7 +43,7 @@ class BulkClientTest {
void testSingleDoc() throws Exception {
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30))
.build();
@ -66,7 +66,7 @@ class BulkClientTest {
void testNewIndex() throws Exception {
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
bulkClient.newIndex("test");
@ -77,11 +77,11 @@ class BulkClientTest {
void testMapping() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
.setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build()) {
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
@ -101,7 +101,7 @@ class BulkClientTest {
long numactions = ACTIONS;
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();
@ -131,7 +131,7 @@ class BulkClientTest {
final long actions = ACTIONS;
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads)
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build();

View file

@ -35,7 +35,7 @@ class DuplicateIDTest {
long numactions = ACTIONS;
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build();
try {

View file

@ -37,11 +37,11 @@ class IndexPruneTest {
void testPrune() throws IOException {
final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
.setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build();
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build();
try {
Settings settings = Settings.builder()

View file

@ -18,6 +18,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -36,11 +37,11 @@ class IndexShiftTest {
void testIndexShift() throws Exception {
final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
.setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build();
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build();
try {
Settings settings = Settings.builder()
@ -65,7 +66,8 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
String resolved = adminClient.resolveAlias("test");
String resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null);
assertEquals("test_shift", resolved);
aliases = adminClient.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
@ -95,7 +97,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = adminClient.resolveAlias("test");
resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null);
aliases = adminClient.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));

View file

@ -39,7 +39,7 @@ class SearchTest {
long numactions = ACTIONS;
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build();
try (bulkClient) {
@ -60,7 +60,7 @@ class SearchTest {
assertNull(bulkClient.getBulkController().getLastBulkError());
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1"))
.setSearchClientProvider(NodeSearchClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build()) {
Stream<SearchHit> stream = searchClient.search(qb -> qb
.setIndices("test")

View file

@ -32,11 +32,11 @@ class SmokeTest {
void smokeTest() throws Exception {
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
.setAdminClientProvider(NodeAdminClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.put(helper.getNodeSettings())
.put(helper.getNodeSettings("1"))
.build()) {
IndexDefinition indexDefinition =
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);

View file

@ -84,11 +84,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName());
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
TransportAddress address = response.getNodes().get(0).getTransport().getAddress().publishAddress();
TransportAddress address = response.getNodes().get(0).getNode().getAddress();
helper.host = address.address().getHostName();
helper.port = address.address().getPort();
logger.info("host = " + helper.host + " port = " + helper.port);
try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
@ -103,7 +104,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
logger.info("cluster up, name = {}", clusterStateResponse.getClusterName().value());
}
@Override
@ -117,10 +118,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients");
for (AbstractClient client : helper.clients.values()) {
client.close();
}
logger.info("closing all nodes");
for (Node node : helper.nodes.values()) {
if (node != null) {
@ -169,8 +166,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
Map<String, Node> nodes = new HashMap<>();
Map<String, AbstractClient> clients = new HashMap<>();
void setHome(String home) {
this.home = home;
}
@ -187,13 +182,11 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return cluster;
}
Settings getNodeSettings() {
Settings getNodeSettings(String id) {
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("cluster.initial_master_nodes", "1")
.put("discovery.seed_hosts", "127.0.0.1:9300")
.put("node.max_local_storage_nodes", "2")
.put("node.max_local_storage_nodes", 2)
.build();
}
@ -202,7 +195,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
ElasticsearchClient client(String id) {
return clients.get(id);
return nodes.get(id).client();
}
String randomString(int len) {
@ -216,14 +209,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private Node buildNode(String id) {
Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
.put(getNodeSettings(id))
.put("node.name", id)
.build();
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
return node;
}
}

View file

@ -9,5 +9,9 @@
<Root level="info">
<AppenderRef ref="Console" />
</Root>
<!-- Remove WARN on elastic embedded because dangling indices cannot be detected -->
<Logger name="org.elasticsearch.gateway.DanglingIndicesState" level="error" />
<!-- Remove WARN on elastic embedded because node and cluster identifiers change between test suites -->
<Logger name="org.elasticsearch.cluster.service.ClusterApplierService" level="error" />
</Loggers>
</configuration>

View file

@ -18,6 +18,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -65,7 +66,8 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test_shift"));
String resolved = adminClient.resolveAlias("test_shift");
String resolved = adminClient.resolveAlias("test_shift").stream().findFirst().orElse(null);
assertNotNull(resolved);
aliases = adminClient.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
@ -95,7 +97,7 @@ class IndexShiftTest {
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = adminClient.resolveAlias("test_shift");
resolved = adminClient.resolveAlias("test_shift").stream().findFirst().orElse(null);
aliases = adminClient.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));

View file

@ -84,9 +84,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().addMetric(NodesInfoRequest.Metric.TRANSPORT.metricName());
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
TransportAddress address = response.getNodes().get(0).getTransport().getAddress().publishAddress();
TransportAddress address = response.getNodes().get(0).getNode().getAddress();
helper.host = address.address().getHostName();
helper.port = address.address().getPort();
try {

View file

@ -9,5 +9,9 @@
<Root level="info">
<AppenderRef ref="Console" />
</Root>
<!-- Remove WARN on elastic embedded because dangling indices cannot be detected -->
<Logger name="org.elasticsearch.gateway.DanglingIndicesState" level="error" />
<!-- Remove WARN on elastic embedded because node and cluster identifiers change between test suites -->
<Logger name="org.elasticsearch.cluster.service.ClusterApplierService" level="error" />
</Loggers>
</configuration>

View file

@ -1,15 +1,14 @@
group = org.xbib
name = elx
version = 7.6.1.3
version = 7.10.2.0
gradle.wrapper.version = 6.4.1
gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0
xbib-netty-http.version = 4.1.50.0
elasticsearch.version = 7.6.1
# ES 7.6.1 uses Jackson 2.8.11
jackson.version = 2.11.0
netty.version = 4.1.50.Final
tcnative.version = 2.0.29.Final
tcnative-legacy-macosx.version = 2.0.26.Final
xbib-netty-http.version = 4.1.58.0
elasticsearch.version = 7.10.2
# ES 7.10.2.1 uses Jackson 2.10.4
jackson.version = 2.12.1
netty.version = 4.1.58.Final
tcnative.version = 2.0.36.Final
bouncycastle.version = 1.64
log4j.version = 2.13.3
log4j.version = 2.14.0

Binary file not shown.

View file

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

2
gradlew vendored
View file

@ -130,7 +130,7 @@ fi
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath

21
gradlew.bat vendored
View file

@ -40,7 +40,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@ -54,7 +54,7 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
@ -64,21 +64,6 @@ echo location of your Java installation.
goto fail
:init
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
:execute
@rem Setup the command line
@ -86,7 +71,7 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell