new elx implementation for Elasticsearch 7.6.1

This commit is contained in:
Jörg Prante 2020-05-10 17:40:58 +02:00
parent 3cc4881f19
commit a5c8dc8b5f
56 changed files with 546 additions and 661 deletions

View file

@ -7,7 +7,7 @@ plugins {
}
if (JavaVersion.current() < JavaVersion.VERSION_11) {
throw new GradleException("This build must be run with java 11")
throw new GradleException("This build must be run with Java/OpenJDK 11+")
}
subprojects {
@ -50,16 +50,15 @@ subprojects {
test {
enabled = true
useJUnitPlatform()
doFirst {
// for Lucene to access jdk.internal.ref and jdk.internal.misc in Java 11+
jvmArgs = [
'--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED'
]
}
systemProperty 'java.util.logging.manager', 'org.apache.logging.log4j.jul.LogManager'
systemProperty 'path.home', "${project.buildDir}/"
systemProperty 'jna.debug_load', 'true'
systemProperty 'path.home', "${project.buildDir}/"
failFast = true
testLogging {
events 'PASSED', 'FAILED', 'SKIPPED'

View file

@ -1,4 +1,4 @@
dependencies {
compile "org.xbib:metrics-common:${project.property('xbib-metrics.version')}"
compile "org.xbib.elasticsearch:elasticsearch:${rootProject.property('elasticsearch-server.version')}"
compile "org.elasticsearch:elasticsearch:${rootProject.property('elasticsearch.version')}"
}

View file

@ -12,8 +12,6 @@ public interface BulkProcessor extends Closeable, Flushable {
BulkProcessor add(ActionRequest request);
BulkProcessor add(ActionRequest request, Object payload);
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;

View file

@ -6,6 +6,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.Closeable;
import java.io.Flushable;
@ -126,8 +127,9 @@ public interface ExtendedClient extends Flushable, Closeable {
* @param id the id
* @param source the source
* @return this
* @throws IOException if update fails
*/
ExtendedClient update(String index, String id, BytesReference source);
ExtendedClient update(String index, String id, BytesReference source) throws IOException;
/**
* Update document. Use with precaution! Does not work in all cases.
@ -199,7 +201,18 @@ public interface ExtendedClient extends Flushable, Closeable {
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException;
ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param settings settings
* @param mapping mapping
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException;
/**
* Create a new index.

View file

@ -1,5 +1,6 @@
dependencies{
compile project(':elx-api')
testCompile "org.xbib.elasticsearch:elasticsearch-analysis-common:${rootProject.property('elasticsearch-server.version')}"
testCompile "org.xbib.elasticsearch:transport-netty4:${rootProject.property('elasticsearch-server.version')}"
testCompile "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
testCompile "io.netty:netty-codec-http:${project.property('netty.version')}"
testCompile "io.netty:netty-transport:${project.property('netty.version')}"
}

View file

@ -5,6 +5,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@ -14,7 +15,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
@ -46,6 +45,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -53,7 +53,6 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@ -89,8 +88,8 @@ import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -113,22 +112,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private static final Logger logger = LogManager.getLogger(AbstractExtendedClient.class.getName());
/**
* The one and only index type name used in the extended client.
* Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_".
*/
private static final String TYPE_NAME = "doc";
/**
* The Elasticsearch client.
*/
private ElasticsearchClient client;
private BulkMetric bulkMetric;
private BulkController bulkController;
private AtomicBoolean closed;
private final AtomicBoolean closed;
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
@Override
@ -195,7 +185,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public AbstractExtendedClient init(Settings settings) throws IOException {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
logger.info("initializing with settings = " + settings.toDelimitedString(','));
if (client == null) {
client = createClient(settings);
}
@ -303,7 +293,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index, Settings settings) throws IOException {
return newIndex(index, settings, (Map<String, Object>) null);
return newIndex(index, settings, (Map<String, ?>) null);
}
@Override
@ -314,7 +304,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
public ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException {
public ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException {
ensureActive();
if (index == null) {
logger.warn("no index name given to create index");
@ -325,14 +315,38 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
createIndexRequest.settings(settings);
}
if (mapping != null) {
createIndexRequest.mapping(TYPE_NAME, mapping);
createIndexRequest.mapping("_doc", mapping);
}
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
XContentBuilder builder = XContentFactory.jsonBuilder();
logger.info("index {} created: {}", index,
Strings.toString(createIndexResponse.toXContent(builder, ToXContent.EMPTY_PARAMS)));
if (createIndexResponse.isAcknowledged()) {
return this;
}
throw new IllegalStateException("index creation not acknowledged");
}
@Override
public ExtendedClient newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
ensureActive();
if (index == null) {
logger.warn("no index name given to create index");
return this;
}
CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
if (settings != null) {
createIndexRequest.settings(settings);
}
if (mapping != null) {
if (mapping.size() != 1) {
throw new IllegalArgumentException("mapping invalid, just use 'doc' for mapping");
}
createIndexRequest.mapping("_doc", mapping);
}
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
if (createIndexResponse.isAcknowledged()) {
return this;
}
throw new IllegalStateException("index creation not acknowledged");
}
@Override
public ExtendedClient deleteIndex(IndexDefinition indexDefinition) {
@ -387,13 +401,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient index(String index, String id, boolean create, String source) {
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
return index(new IndexRequest().index(index).id(id).create(create)
.source(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON));
}
@Override
public ExtendedClient index(String index, String id, boolean create, BytesReference source) {
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
return index(new IndexRequest().index(index).id(id).create(create)
.source(source, XContentType.JSON));
}
@ -406,7 +420,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient delete(String index, String id) {
return delete(new DeleteRequest(index, TYPE_NAME, id));
return delete(new DeleteRequest().index(index).id(id));
}
@Override
@ -417,14 +431,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
public ExtendedClient update(String index, String id, BytesReference source) {
return update(new UpdateRequest(index, TYPE_NAME, id)
public ExtendedClient update(String index, String id, BytesReference source) throws IOException {
return update(new UpdateRequest().index(index).id(id)
.doc(source, XContentType.JSON));
}
@Override
public ExtendedClient update(String index, String id, String source) {
return update(new UpdateRequest(index, TYPE_NAME, id)
return update(new UpdateRequest().index(index).id(id)
.doc(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON));
}
@ -458,7 +472,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
logger.error("timeout waiting for recovery");
logger.warn("timeout waiting for recovery");
return false;
}
}
@ -473,9 +487,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
if (logger.isErrorEnabled()) {
logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
}
logger.warn("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
return false;
}
return true;
@ -693,7 +705,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
if (!indicesAliasesRequest.getAliasActions().isEmpty()) {
logger.debug("indices alias request = {}", indicesAliasesRequest.getAliasActions().toString());
IndicesAliasesResponse indicesAliasesResponse =
AcknowledgedResponse indicesAliasesResponse =
client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet();
logger.debug("response isAcknowledged = {} isFragment = {}",
indicesAliasesResponse.isAcknowledged(), indicesAliasesResponse.isFragment());
@ -719,7 +731,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
logger.info("{} indices", getIndexResponse.getIndices().length);
logger.info("pruneIndex: total of {} indices", getIndexResponse.getIndices().length);
List<String> candidateIndices = new ArrayList<>();
for (String s : getIndexResponse.getIndices()) {
Matcher m = pattern.matcher(s);
@ -754,21 +766,30 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
String[] s = new String[indicesToDelete.size()];
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest()
.indices(indicesToDelete.toArray(s));
DeleteIndexResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
AcknowledgedResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
if( response.isAcknowledged()) {
logger.log(Level.INFO, "deletion of {} acknowledged, waiting for GREEN", Arrays.asList(s));
waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
return new SuccessPruneResult(candidateIndices, indicesToDelete, response);
} else {
logger.log(Level.WARN, "deletion of {} not acknowledged", Arrays.asList(s));
return new FailPruneResult(candidateIndices, indicesToDelete, response);
}
}
@Override
public Long mostRecentDocument(String index, String timestampfieldname) {
ensureActive();
SortBuilder<?> sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.sort(sort);
builder.storedField(timestampfieldname);
builder.size(1);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.source(builder);
SortBuilder<?> sort = SortBuilders
.fieldSort(timestampfieldname)
.order(SortOrder.DESC);
SearchSourceBuilder builder = new SearchSourceBuilder()
.sort(sort)
.storedField(timestampfieldname)
.size(1);
SearchRequest searchRequest = new SearchRequest()
.indices(index)
.source(builder);
SearchResponse searchResponse =
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
if (searchResponse.getHits().getHits().length == 1) {
@ -912,17 +933,17 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private void checkMapping(String index, String type, MappingMetaData mappingMetaData) {
try {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
builder.size(0);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.types(type);
searchRequest.source(builder);
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices(index)
.source(builder);
SearchResponse searchResponse =
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
long total = searchResponse.getHits().getTotalHits();
if (total > 0L) {
TotalHits total = searchResponse.getHits().getTotalHits();
if (total.value > 0L) {
Map<String, Long> fields = new TreeMap<>();
Map<String, Object> root = mappingMetaData.getSourceAsMap();
checkMapping(index, type, "", "", root, fields);
@ -930,9 +951,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
Map<String, Long> map = sortByValue(fields);
map.forEach((key, value) -> {
logger.info("{} {} {}",
key,
value,
(double) value * 100 / total);
key, value, (double) value * 100 / total.value);
if (value == 0) {
empty.incrementAndGet();
}
@ -976,23 +995,23 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
} else if ("type".equals(key)) {
QueryBuilder filterBuilder = QueryBuilders.existsQuery(path);
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(filterBuilder);
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(queryBuilder);
builder.size(0);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.types(type);
searchRequest.source(builder);
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(queryBuilder)
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices(index)
.source(builder);
SearchResponse searchResponse =
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
fields.put(path, searchResponse.getHits().getTotalHits());
fields.put(path, searchResponse.getHits().getTotalHits().value);
}
}
}
private static <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V> map) {
Map<K, V> result = new LinkedHashMap<>();
map.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue))
map.entrySet().stream().sorted(Map.Entry.comparingByValue())
.forEachOrdered(e -> result.put(e.getKey(), e.getValue()));
return result;
}
@ -1046,10 +1065,46 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
List<String> indicesToDelete;
DeleteIndexResponse response;
AcknowledgedResponse response;
SuccessPruneResult(List<String> candidateIndices, List<String> indicesToDelete,
DeleteIndexResponse response) {
AcknowledgedResponse response) {
this.candidateIndices = candidateIndices;
this.indicesToDelete = indicesToDelete;
this.response = response;
}
@Override
public IndexPruneResult.State getState() {
return IndexPruneResult.State.SUCCESS;
}
@Override
public List<String> getCandidateIndices() {
return candidateIndices;
}
@Override
public List<String> getDeletedIndices() {
return indicesToDelete;
}
@Override
public boolean isAcknowledged() {
return response.isAcknowledged();
}
}
private static class FailPruneResult implements IndexPruneResult {
List<String> candidateIndices;
List<String> indicesToDelete;
AcknowledgedResponse response;
FailPruneResult(List<String> candidateIndices, List<String> indicesToDelete,
AcknowledgedResponse response) {
this.candidateIndices = candidateIndices;
this.indicesToDelete = indicesToDelete;
this.response = response;

View file

@ -125,7 +125,7 @@ public class DefaultBulkController implements BulkController {
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
bulkMetric.getCurrentIngest().inc(indexRequest.index(), "_doc", indexRequest.id());
}
bulkProcessor.add(indexRequest);
} catch (Exception e) {
@ -144,7 +144,7 @@ public class DefaultBulkController implements BulkController {
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), "_doc", deleteRequest.id());
}
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
@ -163,7 +163,7 @@ public class DefaultBulkController implements BulkController {
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
bulkMetric.getCurrentIngest().inc(updateRequest.index(), "_doc", updateRequest.id());
}
bulkProcessor.add(updateRequest);
} catch (Exception e) {

View file

@ -140,19 +140,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
*/
@Override
public DefaultBulkProcessor add(ActionRequest request) {
return add(request, null);
}
/**
* Adds either a delete or an index request with a payload.
*
* @param request request
* @param payload payload
* @return his bulk processor
*/
@Override
public DefaultBulkProcessor add(ActionRequest request, Object payload) {
internalAdd(request, payload);
internalAdd(request);
return this;
}
@ -186,14 +174,14 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
}
private synchronized void internalAdd(ActionRequest request, Object payload) {
private synchronized void internalAdd(ActionRequest request) {
ensureOpen();
if (request instanceof IndexRequest) {
bulkRequest.add((IndexRequest) request, payload);
bulkRequest.add((IndexRequest) request);
} else if (request instanceof DeleteRequest) {
bulkRequest.add((DeleteRequest) request, payload);
bulkRequest.add((DeleteRequest) request);
} else if (request instanceof UpdateRequest) {
bulkRequest.add((UpdateRequest) request, payload);
bulkRequest.add((UpdateRequest) request);
} else {
throw new UnsupportedOperationException();
}

View file

@ -6,10 +6,12 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.util.List;
import java.util.Map;
class MockNode extends Node {
MockNode(Settings settings, List<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins);
super(InternalSettingsPreparer.prepareEnvironment(settings, Map.of(), null, () -> "mock"),
classpathPlugins, false);
}
}

View file

@ -33,7 +33,7 @@ class SearchTest {
ElasticsearchClient client = helper.client("1");
BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE);
for (int i = 0; i < 1000; i++) {
IndexRequest indexRequest = new IndexRequest("pages", "row")
IndexRequest indexRequest = new IndexRequest().index("pages")
.source(XContentFactory.jsonBuilder()
.startObject()
.field("user1", "joerg")
@ -61,10 +61,9 @@ class SearchTest {
searchSource.size(10);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("pages");
searchRequest.types("row");
searchRequest.source(searchSource);
SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
assertTrue(searchResponse.getHits().getTotalHits() > 0);
assertTrue(searchResponse.getHits().getTotalHits().value > 0);
}
}
}

View file

@ -41,13 +41,13 @@ class SimpleTest {
Settings indexSettings = Settings.builder()
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
.put("index.analysis.analyzer.default.type", "keyword")
.build();
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index("test").settings(indexSettings);
helper.client("1").execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
IndexRequest indexRequest = new IndexRequest();
indexRequest.index("test").type("test").id("1")
indexRequest.index("test").id("1")
.source(XContentFactory.jsonBuilder().startObject().field("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject());
helper.client("1").execute(IndexAction.INSTANCE, indexRequest).actionGet();
@ -58,7 +58,7 @@ class SimpleTest {
builder.query(QueryBuilders.matchQuery("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8"));
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test").types("test");
searchRequest.indices("test");
searchRequest.source(builder);
String doc = helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet()
.getHits().getAt(0).getSourceAsString();

View file

@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -22,7 +21,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@ -37,7 +36,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -188,8 +187,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("discovery.zen.master_election.ignore_non_master_pings", "true")
.put("transport.netty.epoll", "false")
.put("cluster.initial_master_nodes", "1")
.put("discovery.seed_hosts", "127.0.0.1:9300")
.build();
}
@ -215,7 +214,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put(getNodeSettings())
.put("node.name", id)
.build();
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);

View file

@ -45,20 +45,20 @@ class WildcardTest {
}
private void index(ElasticsearchClient client, String id, String fieldValue) throws IOException {
client.execute(IndexAction.INSTANCE, new IndexRequest("index", "type", id)
client.execute(IndexAction.INSTANCE, new IndexRequest().index("index").id(id)
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject()))
.actionGet();
client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet();
}
private long count(ElasticsearchClient client, QueryBuilder queryBuilder) {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(queryBuilder);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("index");
searchRequest.types("type");
searchRequest.source(builder);
return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(queryBuilder)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("index")
.source(builder);
return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits().value;
}
private void validateCount(ElasticsearchClient client, QueryBuilder queryBuilder, long expectedHits) {

View file

@ -2,7 +2,7 @@ import org.apache.tools.ant.taskdefs.condition.Os
dependencies{
compile project(':elx-common')
compile "org.xbib.elasticsearch:transport-netty4:${rootProject.property('elasticsearch-server.version')}"
compile "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
compile "org.xbib:netty-http-client:${project.property('xbib-netty-http.version')}"
runtime "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
if (Os.isFamily(Os.FAMILY_MAC)) {
@ -10,5 +10,4 @@ dependencies{
} else if (Os.isFamily(Os.FAMILY_UNIX)) {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
}
testCompile "org.xbib.elasticsearch:elasticsearch-analysis-common:${rootProject.property('elasticsearch-server.version')}"
}

View file

@ -3,13 +3,12 @@ package org.xbib.elx.http;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
@ -43,7 +42,7 @@ public class ExtendedHttpClient extends AbstractExtendedClient implements Elasti
private final NamedXContentRegistry registry;
@SuppressWarnings("rawtypes")
private final Map<GenericAction, HttpAction> actionMap;
private final Map<ActionType, HttpAction> actionMap;
private String url;
@ -109,28 +108,19 @@ public class ExtendedHttpClient extends AbstractExtendedClient implements Elasti
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> ActionFuture<Response>
execute(Action<Request, Response, RequestBuilder> action, Request request) {
public <Request extends ActionRequest, Response extends ActionResponse>
ActionFuture<Response> execute(ActionType<Response> action, Request request) {
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void
execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
public <Request extends ActionRequest, Response extends ActionResponse>
void execute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
doExecute(action, request, listener);
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> RequestBuilder
prepareExecute(Action<Request, Response, RequestBuilder> action) {
return action.newRequestBuilder(this);
}
@Override
public ThreadPool threadPool() {
logger.log(Level.DEBUG, "returning null for threadPool() request");
@ -138,8 +128,8 @@ public class ExtendedHttpClient extends AbstractExtendedClient implements Elasti
}
@SuppressWarnings({"unchecked", "rawtypes"})
private <R extends ActionRequest, T extends ActionResponse, B extends ActionRequestBuilder<R, T, B>>
void doExecute(Action<R, T, B> action, R request, ActionListener<T> listener) {
private <R extends ActionRequest, T extends ActionResponse, B extends ActionRequestBuilder<R, T>>
void doExecute(ActionType<T> action, R request, ActionListener<T> listener) {
HttpAction httpAction = actionMap.get(action);
if (httpAction == null) {
throw new IllegalStateException("failed to find http action [" + action + "] to execute");

View file

@ -12,7 +12,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -25,6 +25,7 @@ import org.elasticsearch.rest.RestStatus;
import org.xbib.net.URL;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.client.api.Transport;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -47,7 +48,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
this.settings = settings;
}
public abstract GenericAction<R, T> getActionInstance();
public abstract ActionType<T> getActionInstance();
final void execute(HttpActionContext<R, T> httpActionContext, ActionListener<T> listener) throws IOException {
try {
@ -71,7 +72,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
} else {
ElasticsearchStatusException statusException = parseToError(httpActionContext);
if (statusException.status().equals(RestStatus.NOT_FOUND)) {
listener.onResponse(emptyResponse());
listener.onResponse(parseToResponse(httpActionContext));
} else {
listener.onFailure(statusException);
}
@ -158,7 +159,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
.createParser(httpActionContext.getExtendedHttpClient().getRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
httpActionContext.getHttpResponse().getBody().toString(StandardCharsets.UTF_8))) {
return entityParser().apply(parser);
return entityParser(httpActionContext.getHttpResponse()).apply(parser);
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
@ -166,12 +167,13 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
}
protected ElasticsearchStatusException parseToError(HttpActionContext<R, T> httpActionContext) {
// we assume a non-empty, valid JSON response body. If there is none, this method must be overriden.
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(httpActionContext.getExtendedHttpClient().getRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
httpActionContext.getHttpResponse().getBody().toString(StandardCharsets.UTF_8))) {
return errorParser().apply(parser);
} catch (IOException e) {
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ElasticsearchStatusException(e.getMessage(), RestStatus.INTERNAL_SERVER_ERROR, e);
}
@ -183,8 +185,5 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
protected abstract Request.Builder createHttpRequest(String baseUrl, R request) throws IOException;
protected abstract CheckedFunction<XContentParser, T, IOException> entityParser();
protected abstract T emptyResponse();
protected abstract CheckedFunction<XContentParser, T, IOException> entityParser(HttpResponse httpResponse);
}

View file

@ -7,6 +7,7 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
@ -23,12 +24,7 @@ public class HttpClusterHealthAction extends HttpAction<ClusterHealthRequest, Cl
}
@Override
protected CheckedFunction<XContentParser, ClusterHealthResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, ClusterHealthResponse, IOException> entityParser(HttpResponse httpResponse) {
return HttpClusterHealthResponse::fromXContent;
}
@Override
protected ClusterHealthResponse emptyResponse() {
return new HttpClusterHealthResponse();
}
}

View file

@ -6,36 +6,13 @@ import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class HttpClusterHealthResponse extends ClusterHealthResponse {
private static final String CLUSTER_NAME = "cluster_name";
private static final String STATUS = "status";
private static final String TIMED_OUT = "timed_out";
private static final String NUMBER_OF_NODES = "number_of_nodes";
private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes";
private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks";
private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch";
private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE = "task_max_waiting_in_queue";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = "task_max_waiting_in_queue_millis";
private static final String ACTIVE_SHARDS_PERCENT_AS_NUMBER = "active_shards_percent_as_number";
private static final String ACTIVE_SHARDS_PERCENT = "active_shards_percent";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String INDICES = "indices";
private String clusterName;
private ClusterStateHealth clusterStateHealth;
@ -93,11 +70,6 @@ public class HttpClusterHealthResponse extends ClusterHealthResponse {
return clusterName;
}
@Override
public ClusterStateHealth getClusterStateHealth() {
return clusterStateHealth;
}
@Override
public boolean isTimedOut() {
return this.timedOut;
@ -172,44 +144,4 @@ public class HttpClusterHealthResponse extends ClusterHealthResponse {
public RestStatus status() {
return isTimedOut() ? RestStatus.REQUEST_TIMEOUT : RestStatus.OK;
}
public static HttpClusterHealthResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
Map<String, Object> map = parser.map();
String clusterName = (String) map.get(CLUSTER_NAME);
ClusterHealthStatus status = ClusterHealthStatus.fromString((String) map.get(STATUS));
Boolean timedOut = (Boolean) map.get(TIMED_OUT);
Integer numberOfNodes = (Integer) map.get(NUMBER_OF_NODES);
Integer numberOfDataNodes = (Integer) map.get(NUMBER_OF_DATA_NODES);
Integer activePrimaryShards = (Integer) map.get(ACTIVE_PRIMARY_SHARDS);
Integer activeShards = (Integer) map.get(ACTIVE_SHARDS);
Integer relocatingShards = (Integer) map.get(RELOCATING_SHARDS);
Integer initializingShards = (Integer) map.get(INITIALIZING_SHARDS);
Integer unassignedShards = (Integer) map.get(UNASSIGNED_SHARDS);
Integer delayedUnassignedShards = (Integer) map.get(DELAYED_UNASSIGNED_SHARDS);
Integer numberOfPendingTasks = (Integer) map.get(NUMBER_OF_PENDING_TASKS);
Integer numberOfInFlightFetch = (Integer) map.get(NUMBER_OF_IN_FLIGHT_FETCH);
Integer taskMaxWaitingInQueueMillis = (Integer) map.get(TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS);
Double activeShardsPercentAsNumber = (Double) map.get(ACTIVE_SHARDS_PERCENT_AS_NUMBER);
HttpClusterHealthResponse clusterHealthResponse = new HttpClusterHealthResponse();
clusterHealthResponse.init(clusterName,
status,
timedOut,
numberOfNodes,
numberOfDataNodes,
Collections.emptyMap(),
activePrimaryShards,
activeShards,
relocatingShards,
initializingShards,
unassignedShards,
delayedUnassignedShards,
numberOfPendingTasks,
numberOfInFlightFetch,
TimeValue.timeValueMillis(taskMaxWaitingInQueueMillis),
activeShardsPercentAsNumber
);
return clusterHealthResponse;
}
}

View file

@ -1,39 +1,17 @@
package org.xbib.elx.http.action.admin.cluster.node.info;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;
import org.xbib.elx.http.HttpAction;
import org.xbib.elx.http.HttpActionContext;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*
@ -95,84 +73,7 @@ public class HttpNodesInfoAction extends HttpAction<NodesInfoRequest, NodesInfoR
}
@Override
protected CheckedFunction<XContentParser, NodesInfoResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, NodesInfoResponse, IOException> entityParser(HttpResponse httpResponse) {
throw new UnsupportedOperationException();
}
@Override
protected NodesInfoResponse emptyResponse() {
return new NodesInfoResponse();
}
@SuppressWarnings("unchecked")
protected NodesInfoResponse createResponse(HttpActionContext<NodesInfoRequest, NodesInfoResponse> httpContext) {
// BROKEN
Map<String, Object> map = null;
//String string = (String)map.get("cluster_name");
ClusterName clusterName = new ClusterName("");
List<NodeInfo> nodeInfoList = new LinkedList<>();
//map = (Map<String, Object>)map.get("nodes");
for (Map.Entry<String, Object> entry : map.entrySet()) {
String nodeId = entry.getKey();
String ephemeralId = null;
Map<String,Object> map2 = (Map<String, Object>) entry.getValue();
String nodeName = (String) map2.get("name");
String hostName = (String) map2.get("host");
String hostAddress = (String) map2.get("ip");
// <host>[/<ip>][:<port>]
String transportAddressString = (String) map2.get("transport_address");
int pos = transportAddressString.indexOf(':');
String host = pos > 0 ? transportAddressString.substring(0, pos) : transportAddressString;
int port = Integer.parseInt(pos > 0 ? transportAddressString.substring(pos + 1) : "0");
pos = host.indexOf('/');
host = pos > 0 ? host.substring(0, pos) : host;
try {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
TransportAddress transportAddress = new TransportAddress(inetAddresses[0], port);
Build build = new Build(Build.Flavor.OSS, Build.Type.TAR,
(String) map2.get("build"),
(String) map2.get("date"),
(Boolean) map2.get("snapshot"));
Map<String, String> attributes = Collections.emptyMap();
Set<DiscoveryNode.Role> roles = new HashSet<>();
Version version = Version.fromString((String) map2.get("version"));
DiscoveryNode discoveryNode = new DiscoveryNode(nodeName, nodeId, ephemeralId, hostName, hostAddress,
transportAddress,
attributes, roles, version);
/*Map<String, String> settingsMap = map2.containsKey("settings") ?
XContentHelper.
SettingsLoader.Helper.loadNestedFromMap((Map<String, Object>) map2.get("settings")) :
Collections.emptyMap();
Settings settings = Settings.builder()
.put(settingsMap)
.build();*/
OsInfo os = null;
ProcessInfo processInfo = null;
JvmInfo jvmInfo = null;
ThreadPoolInfo threadPoolInfo = null;
TransportInfo transportInfo = null;
HttpInfo httpInfo = null;
PluginsAndModules pluginsAndModules = null;
IngestInfo ingestInfo = null;
ByteSizeValue totalIndexingBuffer = null;
NodeInfo nodeInfo = new NodeInfo(version,
build,
discoveryNode,
//serviceAttributes,
//settings,
null,
os, processInfo, jvmInfo, threadPoolInfo, transportInfo, httpInfo, pluginsAndModules,
ingestInfo,
totalIndexingBuffer);
nodeInfoList.add(nodeInfo);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
List<FailedNodeException> failures = null;
return new NodesInfoResponse(clusterName, nodeInfoList, failures);
}
}

View file

@ -10,6 +10,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -40,12 +41,7 @@ public class HttpClusterUpdateSettingsAction extends HttpAction<ClusterUpdateSet
}
@Override
protected CheckedFunction<XContentParser, ClusterUpdateSettingsResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, ClusterUpdateSettingsResponse, IOException> entityParser(HttpResponse httpResponse) {
return ClusterUpdateSettingsResponse::fromXContent;
}
@Override
protected ClusterUpdateSettingsResponse emptyResponse() {
return new ClusterUpdateSettingsResponse();
}
}

View file

@ -1,7 +1,6 @@
package org.xbib.elx.http.action.admin.cluster.state;
import com.carrotsearch.hppc.LongArrayList;
import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -20,6 +19,7 @@ import org.elasticsearch.common.xcontent.NamedObjectNotFoundException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.util.ArrayList;
@ -73,15 +73,10 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
}
@Override
protected CheckedFunction<XContentParser, ClusterStateResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, ClusterStateResponse, IOException> entityParser(HttpResponse httpResponse) {
return this::fromXContent;
}
@Override
protected ClusterStateResponse emptyResponse() {
return new ClusterStateResponse();
}
private ClusterStateResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
@ -109,7 +104,7 @@ public class HttpClusterStateAction extends HttpAction<ClusterStateRequest, Clus
}
ClusterState clusterState = builder.build();
return new ClusterStateResponse(clusterName, clusterState, length);
return new ClusterStateResponse(clusterName, clusterState, true);
}
private MetaData metadataFromXContent(XContentParser parser) throws IOException {

View file

@ -3,7 +3,7 @@ package org.xbib.elx.http.action.admin.indices.alias;
import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
@ -12,10 +12,11 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
public class HttpIndicesAliasesAction extends HttpAction<IndicesAliasesRequest, IndicesAliasesResponse> {
public class HttpIndicesAliasesAction extends HttpAction<IndicesAliasesRequest, AcknowledgedResponse> {
@Override
public IndicesAliasesAction getActionInstance() {
@ -37,12 +38,7 @@ public class HttpIndicesAliasesAction extends HttpAction<IndicesAliasesRequest,
}
@Override
protected CheckedFunction<XContentParser, IndicesAliasesResponse, IOException> entityParser() {
return IndicesAliasesResponse::fromXContent;
}
@Override
protected IndicesAliasesResponse emptyResponse() {
return new IndicesAliasesResponse();
protected CheckedFunction<XContentParser, AcknowledgedResponse, IOException> entityParser(HttpResponse httpResponse) {
return AcknowledgedResponse::fromXContent;
}
}

View file

@ -1,6 +1,6 @@
package org.xbib.elx.http.action.admin.indices.alias.get;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
@ -10,6 +10,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.util.ArrayList;
@ -29,21 +30,15 @@ public class HttpGetAliasAction extends HttpAction<GetAliasesRequest, GetAliases
}
@Override
public GenericAction<GetAliasesRequest, GetAliasesResponse> getActionInstance() {
public ActionType<GetAliasesResponse> getActionInstance() {
return GetAliasesAction.INSTANCE;
}
@Override
protected CheckedFunction<XContentParser, GetAliasesResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, GetAliasesResponse, IOException> entityParser(HttpResponse httpResponse) {
return this::fromXContent;
}
@Override
protected GetAliasesResponse emptyResponse() {
ImmutableOpenMap.Builder<String, List<AliasMetaData>> aliasesBuilder = ImmutableOpenMap.builder();
return new GetAliasesResponse(aliasesBuilder.build());
}
private GetAliasesResponse fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) {
parser.nextToken();

View file

@ -1,21 +1,32 @@
package org.xbib.elx.http.action.admin.indices.create;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.util.Map;
public class HttpCreateIndexAction extends HttpAction<CreateIndexRequest, CreateIndexResponse> {
public static final ParseField MAPPINGS = new ParseField("mappings");
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField ALIASES = new ParseField("aliases");
@Override
public CreateIndexAction getActionInstance() {
return CreateIndexAction.INSTANCE;
@ -23,18 +34,40 @@ public class HttpCreateIndexAction extends HttpAction<CreateIndexRequest, Create
@Override
protected Request.Builder createHttpRequest(String url, CreateIndexRequest createIndexRequest) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder = createIndexRequest.toXContent(builder, ToXContent.EMPTY_PARAMS);
return newPutRequest(url, "/" + createIndexRequest.index(), BytesReference.bytes(builder));
XContentBuilder builder = toXContent(createIndexRequest, XContentFactory.jsonBuilder());
return newPutRequest(url, "/" + createIndexRequest.index() + "?include_type_name=true",
BytesReference.bytes(builder));
}
@Override
protected CheckedFunction<XContentParser, CreateIndexResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, CreateIndexResponse, IOException> entityParser(HttpResponse httpResponse) {
return CreateIndexResponse::fromXContent;
}
@Override
protected CreateIndexResponse emptyResponse() {
return new CreateIndexResponse();
// fixed version from CreateIndexRequest - use only one mapping
private XContentBuilder toXContent(CreateIndexRequest createIndexRequest,
XContentBuilder builder) throws IOException {
builder.startObject();
builder.startObject(SETTINGS.getPreferredName());
createIndexRequest.settings().toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
// there is only an empty or a single entry for mappings
if (createIndexRequest.mappings().isEmpty()) {
// ES wants a mappings element with an empty map
builder.startObject(MAPPINGS.getPreferredName());
builder.endObject();
} else {
Map<String, ?> mappingAsMap = createIndexRequest.mappings();
String mappingString = mappingAsMap.values().iterator().next().toString();
builder.field(MAPPINGS.getPreferredName());
builder.map(XContentHelper.convertToMap(new BytesArray(mappingString), false, XContentType.JSON).v2());
}
builder.startObject(ALIASES.getPreferredName());
for (Alias alias : createIndexRequest.aliases()) {
alias.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endObject();
builder.endObject();
return builder;
}
}

View file

@ -2,15 +2,16 @@ package org.xbib.elx.http.action.admin.indices.delete;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
public class HttpDeleteIndexAction extends HttpAction<DeleteIndexRequest, DeleteIndexResponse> {
public class HttpDeleteIndexAction extends HttpAction<DeleteIndexRequest, AcknowledgedResponse> {
@Override
public DeleteIndexAction getActionInstance() {
@ -23,12 +24,7 @@ public class HttpDeleteIndexAction extends HttpAction<DeleteIndexRequest, Delete
}
@Override
protected CheckedFunction<XContentParser, DeleteIndexResponse, IOException> entityParser() {
return DeleteIndexResponse::fromXContent;
}
@Override
protected DeleteIndexResponse emptyResponse() {
return new DeleteIndexResponse();
protected CheckedFunction<XContentParser, AcknowledgedResponse, IOException> entityParser(HttpResponse httpResponse) {
return AcknowledgedResponse::fromXContent;
}
}

View file

@ -10,6 +10,7 @@ import org.elasticsearch.rest.RestStatus;
import org.xbib.elx.http.HttpAction;
import org.xbib.elx.http.HttpActionContext;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
@ -26,22 +27,27 @@ public class HttpIndicesExistsAction extends HttpAction<IndicesExistsRequest, In
return newHeadRequest(url, index);
}
@Override
protected CheckedFunction<XContentParser, IndicesExistsResponse, IOException> entityParser() {
return this::fromXContent;
}
@Override
protected IndicesExistsResponse emptyResponse() {
return new IndicesExistsResponse(false); // used for 404 Not found
}
/**
* Override for non-body logic.
* @param httpActionContext the HTTP action context
* @return the ELasticsearch sttatus exception
*/
@Override
protected ElasticsearchStatusException parseToError(HttpActionContext<IndicesExistsRequest, IndicesExistsResponse> httpActionContext) {
return new ElasticsearchStatusException("not found", RestStatus.NOT_FOUND);
}
private IndicesExistsResponse fromXContent(XContentParser parser) throws IOException {
return new IndicesExistsResponse(true); // used for 200 OK
@Override
protected CheckedFunction<XContentParser, IndicesExistsResponse, IOException> entityParser(HttpResponse httpResponse) {
return httpResponse.getStatus().getCode() == 200 ? this::found : this::notfound;
}
private IndicesExistsResponse found(XContentParser parser) {
return new IndicesExistsResponse(true);
}
private IndicesExistsResponse notfound(XContentParser parser) {
return new IndicesExistsResponse(false);
}
}

View file

@ -1,5 +1,8 @@
package org.elasticsearch.action.admin.indices.mapping.get;
package org.xbib.elx.http.action.admin.indices.mapping.get;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
@ -7,12 +10,10 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public class HttpGetMappingsAction extends HttpAction<GetMappingsRequest, GetMappingsResponse> {
private static final ParseField MAPPINGS = new ParseField("mappings");
@ -29,31 +30,24 @@ public class HttpGetMappingsAction extends HttpAction<GetMappingsRequest, GetMap
}
@Override
protected CheckedFunction<XContentParser, GetMappingsResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, GetMappingsResponse, IOException> entityParser(HttpResponse httpResponse) {
return this::fromXContent;
}
@Override
protected GetMappingsResponse emptyResponse() {
return new GetMappingsResponse();
}
// fixed version from GetMappingsRequest - use only one mapping per index with type "_doc"
@SuppressWarnings("unchecked")
private GetMappingsResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
Map<String, Object> parts = parser.map();
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> builder = new ImmutableOpenMap.Builder<>();
for (Map.Entry<String, Object> entry : parts.entrySet()) {
String indexName = entry.getKey();
Map<String, Object> mapping = (Map<String, Object>) ((Map) entry.getValue()).get(MAPPINGS.getPreferredName());
ImmutableOpenMap.Builder<String, MappingMetaData> typeBuilder = new ImmutableOpenMap.Builder<>();
for (Map.Entry<String, Object> typeEntry : mapping.entrySet()) {
String typeName = typeEntry.getKey();
Map<String, Object> fieldMappings = (Map<String, Object>) typeEntry.getValue();
MappingMetaData mmd = new MappingMetaData(typeName, fieldMappings);
typeBuilder.put(typeName, mmd);
if (parser.currentToken() == null) {
parser.nextToken();
}
Map<String, Object> map = parser.map();
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> builder = new ImmutableOpenMap.Builder<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
final String indexName = entry.getKey();
final Map<String, Object> mapping = (Map<String, Object>) ((Map<String, Object>) entry.getValue()).get(MAPPINGS.getPreferredName());
ImmutableOpenMap.Builder<String, MappingMetaData> typeBuilder = new ImmutableOpenMap.Builder<>();
MappingMetaData mmd = new MappingMetaData("_doc", mapping);
typeBuilder.put("_doc", mmd);
builder.put(indexName, typeBuilder.build());
}
return new GetMappingsResponse(builder.build());

View file

@ -7,6 +7,7 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
@ -24,12 +25,7 @@ public class HttpRefreshIndexAction extends HttpAction<RefreshRequest, RefreshRe
}
@Override
protected CheckedFunction<XContentParser, RefreshResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, RefreshResponse, IOException> entityParser(HttpResponse httpResponse) {
return RefreshResponse::fromXContent;
}
@Override
protected RefreshResponse emptyResponse() {
return new RefreshResponse();
}
}

View file

@ -4,16 +4,12 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class HttpGetSettingsAction extends HttpAction<GetSettingsRequest, GetSettingsResponse> {
@ -30,62 +26,7 @@ public class HttpGetSettingsAction extends HttpAction<GetSettingsRequest, GetSet
}
@Override
protected CheckedFunction<XContentParser, GetSettingsResponse, IOException> entityParser() {
return this::fromXContent;
}
@Override
protected GetSettingsResponse emptyResponse() {
ImmutableOpenMap<String, Settings> settingsMap = ImmutableOpenMap.<String, Settings>builder().build();
return new GetSettingsResponse(settingsMap);
}
private GetSettingsResponse fromXContent(XContentParser parser) throws IOException {
Map<String, Settings> indexToSettings = new HashMap<>();
Map<String, Settings> indexToDefaultSettings = new HashMap<>();
if (parser.currentToken() == null) {
parser.nextToken();
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
parser.nextToken();
while (!parser.isClosed()) {
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
parseIndexEntry(parser, indexToSettings, indexToDefaultSettings);
} else if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
parser.skipChildren();
} else {
parser.nextToken();
}
}
ImmutableOpenMap<String, Settings> settingsMap = ImmutableOpenMap.<String, Settings>builder().putAll(indexToSettings).build();
return new GetSettingsResponse(settingsMap);
}
private static void parseIndexEntry(XContentParser parser, Map<String, Settings> indexToSettings,
Map<String, Settings> indexToDefaultSettings) throws IOException {
String indexName = parser.currentName();
parser.nextToken();
while (!parser.isClosed() && parser.currentToken() != XContentParser.Token.END_OBJECT) {
parseSettingsField(parser, indexName, indexToSettings, indexToDefaultSettings);
}
}
private static void parseSettingsField(XContentParser parser, String currentIndexName, Map<String, Settings> indexToSettings,
Map<String, Settings> indexToDefaultSettings) throws IOException {
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
switch (parser.currentName()) {
case "settings":
indexToSettings.put(currentIndexName, Settings.fromXContent(parser));
break;
case "defaults":
indexToDefaultSettings.put(currentIndexName, Settings.fromXContent(parser));
break;
default:
parser.skipChildren();
}
} else if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
parser.skipChildren();
}
parser.nextToken();
protected CheckedFunction<XContentParser, GetSettingsResponse, IOException> entityParser(HttpResponse httpResponse) {
return GetSettingsResponse::fromXContent;
}
}

View file

@ -2,7 +2,7 @@ package org.xbib.elx.http.action.admin.indices.settings.put;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
@ -11,11 +11,12 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.io.UncheckedIOException;
public class HttpUpdateSettingsAction extends HttpAction<UpdateSettingsRequest, UpdateSettingsResponse> {
public class HttpUpdateSettingsAction extends HttpAction<UpdateSettingsRequest, AcknowledgedResponse> {
@Override
public UpdateSettingsAction getActionInstance() {
@ -37,12 +38,7 @@ public class HttpUpdateSettingsAction extends HttpAction<UpdateSettingsRequest,
}
@Override
protected CheckedFunction<XContentParser, UpdateSettingsResponse, IOException> entityParser() {
return UpdateSettingsResponse::fromXContent;
}
@Override
protected UpdateSettingsResponse emptyResponse() {
return new UpdateSettingsResponse();
protected CheckedFunction<XContentParser, AcknowledgedResponse, IOException> entityParser(HttpResponse httpResponse) {
return AcknowledgedResponse::fromXContent;
}
}

View file

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
@ -32,16 +33,13 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
IndexRequest indexRequest = (IndexRequest) actionRequest;
bulkContent.append("{\"").append(indexRequest.opType().getLowercase()).append("\":{");
bulkContent.append("\"_index\":\"").append(indexRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append(indexRequest.type()).append("\"");
bulkContent.append(",\"_type\":\"").append("_doc").append("\"");
if (indexRequest.id() != null) {
bulkContent.append(",\"_id\":\"").append(indexRequest.id()).append("\"");
}
if (indexRequest.routing() != null) {
bulkContent.append(",\"_routing\":\"").append(indexRequest.routing()).append("\"");
}
if (indexRequest.parent() != null) {
bulkContent.append(",\"_parent\":\"").append(indexRequest.parent()).append("\"");
}
if (indexRequest.version() > 0) {
bulkContent.append(",\"_version\":\"").append(indexRequest.version()).append("\"");
if (indexRequest.versionType() != null) {
@ -55,14 +53,11 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
bulkContent.append("{\"update\":{");
bulkContent.append("\"_index\":\"").append(updateRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append(updateRequest.type()).append("\"");
bulkContent.append(",\"_type\":\"").append("_doc").append("\"");
bulkContent.append(",\"_id\":\"").append(updateRequest.id()).append("\"");
if (updateRequest.routing() != null) {
bulkContent.append(",\"_routing\":\"").append(updateRequest.routing()).append("\"");
}
if (updateRequest.parent() != null) {
bulkContent.append(",\"_parent\":\"").append(updateRequest.parent()).append("\"");
}
if (updateRequest.version() > 0) {
bulkContent.append(",\"_version\":\"").append(updateRequest.version()).append("\"");
if (updateRequest.versionType() != null) {
@ -80,7 +75,7 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
DeleteRequest deleteRequest = (DeleteRequest) actionRequest;
bulkContent.append("{\"delete\":{");
bulkContent.append("\"_index\":\"").append(deleteRequest.index()).append("\"");
bulkContent.append(",\"_type\":\"").append(deleteRequest.type()).append("\"");
bulkContent.append(",\"_type\":\"").append("_doc").append("\"");
bulkContent.append(",\"_id\":\"").append(deleteRequest.id()).append("\"");
if (deleteRequest.routing() != null) {
bulkContent.append(",\"_routing\":\"").append(deleteRequest.routing()).append("\""); // _routing
@ -92,14 +87,7 @@ public class HttpBulkAction extends HttpAction<BulkRequest, BulkResponse> {
}
@Override
protected CheckedFunction<XContentParser, BulkResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, BulkResponse, IOException> entityParser(HttpResponse httpResponse) {
return BulkResponse::fromXContent;
}
@Override
protected BulkResponse emptyResponse() {
BulkItemResponse[] responses = null;
long took = 0L;
return new BulkResponse(responses, took);
}
}

View file

@ -1,6 +1,6 @@
package org.xbib.elx.http.action.get;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -8,28 +8,24 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
public class HttpExistsAction extends HttpAction<GetRequest, GetResponse> {
@Override
public GenericAction<GetRequest, GetResponse> getActionInstance() {
public ActionType<GetResponse> getActionInstance() {
return GetAction.INSTANCE;
}
@Override
protected Request.Builder createHttpRequest(String url, GetRequest request) {
return newHeadRequest(url, "/" + request.index() + "/" + request.type() + "/" + request.id());
return newHeadRequest(url, "/" + request.index() + "/_doc/" + request.id());
}
@Override
protected CheckedFunction<XContentParser, GetResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, GetResponse, IOException> entityParser(HttpResponse httpResponse) {
return GetResponse::fromXContent;
}
@Override
protected GetResponse emptyResponse() {
return new GetResponse();
}
}

View file

@ -1,6 +1,6 @@
package org.xbib.elx.http.action.get;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -8,28 +8,24 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
public class HttpGetAction extends HttpAction<GetRequest, GetResponse> {
@Override
public GenericAction<GetRequest, GetResponse> getActionInstance() {
public ActionType<GetResponse> getActionInstance() {
return GetAction.INSTANCE;
}
@Override
protected Request.Builder createHttpRequest(String url, GetRequest request) {
return newGetRequest(url, "/" + request.index() + "/" + request.type() + "/" + request.id());
return newGetRequest(url, "/" + request.index() + "/_doc/" + "/" + request.id());
}
@Override
protected CheckedFunction<XContentParser, GetResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, GetResponse, IOException> entityParser(HttpResponse httpResponse) {
return GetResponse::fromXContent;
}
@Override
protected GetResponse emptyResponse() {
return new GetResponse();
}
}

View file

@ -1,6 +1,6 @@
package org.xbib.elx.http.action.index;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
@ -8,29 +8,25 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
public class HttpIndexAction extends HttpAction<IndexRequest, IndexResponse> {
@Override
public GenericAction<IndexRequest, IndexResponse> getActionInstance() {
public ActionType<IndexResponse> getActionInstance() {
return IndexAction.INSTANCE;
}
@Override
protected Request.Builder createHttpRequest(String url, IndexRequest request) {
return newPutRequest(url, "/" + request.index() + "/" + request.type() + "/" + request.id(),
return newPutRequest(url, "/" + request.index() + "/_doc/" + request.id(),
request.source());
}
@Override
protected CheckedFunction<XContentParser, IndexResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, IndexResponse, IOException> entityParser(HttpResponse httpResponse) {
return IndexResponse::fromXContent;
}
@Override
protected IndexResponse emptyResponse() {
return new IndexResponse();
}
}

View file

@ -1,6 +1,6 @@
package org.xbib.elx.http.action.main;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.main.MainAction;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
@ -8,13 +8,14 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
public class HttpMainAction extends HttpAction<MainRequest, MainResponse> {
@Override
public GenericAction<MainRequest, MainResponse> getActionInstance() {
public ActionType<MainResponse> getActionInstance() {
return MainAction.INSTANCE;
}
@ -24,12 +25,7 @@ public class HttpMainAction extends HttpAction<MainRequest, MainResponse> {
}
@Override
protected CheckedFunction<XContentParser, MainResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, MainResponse, IOException> entityParser(HttpResponse httpResponse) {
return MainResponse::fromXContent;
}
@Override
protected MainResponse emptyResponse() {
return new MainResponse();
}
}

View file

@ -7,6 +7,7 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
@ -25,12 +26,7 @@ public class HttpSearchAction extends HttpAction<SearchRequest, SearchResponse>
}
@Override
protected CheckedFunction<XContentParser, SearchResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, SearchResponse, IOException> entityParser(HttpResponse httpResponse) {
return SearchResponse::fromXContent;
}
@Override
protected SearchResponse emptyResponse() {
return new SearchResponse();
}
}

View file

@ -1,6 +1,6 @@
package org.xbib.elx.http.action.update;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
@ -12,13 +12,14 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
public class HttpUpdateAction extends HttpAction<UpdateRequest, UpdateResponse> {
@Override
public GenericAction<UpdateRequest, UpdateResponse> getActionInstance() {
public ActionType<UpdateResponse> getActionInstance() {
return UpdateAction.INSTANCE;
}
@ -47,7 +48,7 @@ public class HttpUpdateAction extends HttpAction<UpdateRequest, UpdateResponse>
}
BytesReference source = XContentHelper.toXContent(updateRequest, xContentType, false);
return newPostRequest(url,
"/" + updateRequest.index() + "/" + updateRequest.type() + "/" + updateRequest.id() + "/_update",
"/" + updateRequest.index() + "/_doc/" + updateRequest.id() + "/_update",
source);
} catch (IOException e) {
logger.error(e.getMessage(), e);
@ -56,12 +57,7 @@ public class HttpUpdateAction extends HttpAction<UpdateRequest, UpdateResponse>
}
@Override
protected CheckedFunction<XContentParser, UpdateResponse, IOException> entityParser() {
protected CheckedFunction<XContentParser, UpdateResponse, IOException> entityParser(HttpResponse httpResponse) {
return UpdateResponse::fromXContent;
}
@Override
protected UpdateResponse emptyResponse() {
return new UpdateResponse();
}
}

View file

@ -1,4 +1,4 @@
org.elasticsearch.action.admin.indices.mapping.get.HttpGetMappingsAction
org.xbib.elx.http.action.admin.indices.mapping.get.HttpGetMappingsAction
org.xbib.elx.http.action.admin.cluster.health.HttpClusterHealthAction
org.xbib.elx.http.action.admin.cluster.node.info.HttpNodesInfoAction
org.xbib.elx.http.action.admin.cluster.settings.HttpClusterUpdateSettingsAction

View file

@ -1,19 +1,25 @@
package org.xbib.elx.http.test;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
@ -21,6 +27,7 @@ import org.xbib.elx.common.Parameters;
import org.xbib.elx.http.ExtendedHttpClient;
import org.xbib.elx.http.ExtendedHttpClientProvider;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -82,28 +89,50 @@ class ClientTest {
}
@Test
void testMapping() throws Exception {
void testNewIndexWithSettings() throws Exception {
final ExtendedHttpClient client = ClientBuilder.builder()
.provider(ExtendedHttpClientProvider.class)
.put(helper.getHttpSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
Settings settings = Settings.builder().put("index.number_of_shards", "1").build();
client.newIndex("test", settings);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest()
.indices("test");
GetSettingsResponse getSettingsResponse =
client.getClient().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
logger.log(Level.INFO, "settings=" + getSettingsResponse.getSetting("test", "index.number_of_shards"));
assertEquals("1", getSettingsResponse.getSetting("test", "index.number_of_shards"));
client.close();
}
@Test
void testNewIndexWithSettingsAndMappings() throws Exception {
final ExtendedHttpClient client = ClientBuilder.builder()
.provider(ExtendedHttpClientProvider.class)
.put(helper.getHttpSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
Settings settings = Settings.builder().put("index.number_of_shards", "1").build();
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.startObject("pos")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
client.newIndex("test", Settings.EMPTY, Strings.toString(builder));
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test");
client.newIndex("test", settings, builder);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest()
.indices("test");
GetSettingsResponse getSettingsResponse =
client.getClient().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
assertEquals("1", getSettingsResponse.getSetting("test", "index.number_of_shards"));
GetMappingsRequest getMappingsRequest = new GetMappingsRequest()
.indices("test");
GetMappingsResponse getMappingsResponse =
client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc"));
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("_doc"));
client.close();
}
@ -136,8 +165,7 @@ class ClientTest {
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0);
assertEquals(numactions,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
assertEquals(numactions, searchRequestBuilder.execute().actionGet().getHits().getTotalHits().value);
client.close();
}
}
@ -146,7 +174,7 @@ class ClientTest {
void testThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors();
Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
final Long actions = ACTIONS;
final long actions = ACTIONS;
logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions);
final ExtendedHttpClient client = ClientBuilder.builder()
.provider(ExtendedHttpClientProvider.class)
@ -196,11 +224,19 @@ class ClientTest {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertNull(client.getBulkController().getLastBulkError());
assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
logger.log(Level.INFO, "refreshing index test");
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0);
assertEquals(maxthreads * actions,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("test")
.source(builder);
SearchResponse searchResponse = client.getClient().execute(SearchAction.INSTANCE, searchRequest).actionGet();
assertEquals(maxthreads * actions, searchResponse.getHits().getTotalHits().value);
client.close();
client.close();
}
}

View file

@ -3,8 +3,11 @@ package org.xbib.elx.http.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
@ -14,7 +17,6 @@ import org.xbib.elx.http.ExtendedHttpClientProvider;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -26,7 +28,7 @@ class DuplicateIDTest {
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
private static final Long ACTIONS = 50L;
private static final Long ACTIONS = 100L;
private final TestExtension.Helper helper;
@ -51,11 +53,16 @@ class DuplicateIDTest {
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test")
.setTypes("test")
.setQuery(matchAllQuery());
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("test")
.source(builder);
SearchResponse searchResponse =
helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet();
long hits = searchResponse.getHits().getTotalHits().value;
logger.info("hits = {}", hits);
assertTrue(hits < ACTIONS);
} catch (NoNodeAvailableException e) {

View file

@ -66,9 +66,9 @@ class IndexPruneTest {
indicesExistsRequest.indices(index);
IndicesExistsResponse indicesExistsResponse =
client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
logger.info("indices exists response for {} is {}", index, indicesExistsResponse.isExists());
list.add(indicesExistsResponse.isExists());
}
logger.info(list);
assertFalse(list.get(0));
assertFalse(list.get(1));
assertTrue(list.get(2));

View file

@ -6,10 +6,12 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.util.List;
import java.util.Map;
public class MockNode extends Node {
public MockNode(Settings settings, List<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins);
super(InternalSettingsPreparer.prepareEnvironment(settings, Map.of(), null, () -> "mock"),
classpathPlugins, false);
}
}

View file

@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -22,7 +21,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@ -37,7 +36,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -192,8 +191,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("discovery.zen.master_election.ignore_non_master_pings", "true")
.put("transport.netty.epoll", "false")
.put("cluster.initial_master_nodes", "1")
.put("discovery.seed_hosts", "127.0.0.1:9300")
.build();
}
@ -228,7 +227,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put(getNodeSettings())
.put("node.name", id)
.build();
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);

View file

@ -1,5 +1,6 @@
dependencies {
compile project(':elx-common')
compile "org.xbib.elasticsearch:transport-netty4:${rootProject.property('elasticsearch-server.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-analysis-common:${rootProject.property('elasticsearch-server.version')}"
compile "org.elasticsearch.plugin:transport-netty4-client:${project.property('elasticsearch.version')}"
compile "io.netty:netty-codec-http:${project.property('netty.version')}"
compile "io.netty:netty-transport:${project.property('netty.version')}"
}

View file

@ -63,7 +63,7 @@ public class ExtendedNodeClient extends AbstractExtendedClient {
private static class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
super(env, classpathPlugins);
super(env, classpathPlugins, false);
}
}
}

View file

@ -1,19 +1,25 @@
package org.xbib.elx.node.test;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
@ -80,27 +86,50 @@ class ClientTest {
}
@Test
void testMapping() throws Exception {
void testNewIndexWithSettings() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(helper.client("1"))
.provider(ExtendedNodeClientProvider.class)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
Settings settings = Settings.builder().put("index.number_of_shards", "1").build();
client.newIndex("test", settings);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest()
.indices("test");
GetSettingsResponse getSettingsResponse =
client.getClient().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
logger.log(Level.INFO, "settings=" + getSettingsResponse.getSetting("test", "index.number_of_shards"));
assertEquals("1", getSettingsResponse.getSetting("test", "index.number_of_shards"));
client.close();
}
@Test
void testNewIndexWithSettingsAndMappings() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(helper.client("1"))
.provider(ExtendedNodeClientProvider.class)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
Settings settings = Settings.builder().put("index.number_of_shards", "1").build();
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
client.newIndex("test", Settings.EMPTY, Strings.toString(builder));
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test");
client.newIndex("test", settings, builder);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest()
.indices("test");
GetSettingsResponse getSettingsResponse =
client.getClient().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
logger.log(Level.INFO, "settings=" + getSettingsResponse.getSetting("test", "index.number_of_shards"));
assertEquals("1", getSettingsResponse.getSetting("test", "index.number_of_shards"));
GetMappingsRequest getMappingsRequest = new GetMappingsRequest()
.indices("test");
GetMappingsResponse getMappingsResponse =
client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc"));
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("_doc"));
client.close();
}
@ -131,8 +160,7 @@ class ClientTest {
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0);
assertEquals(numactions,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
assertEquals(numactions, searchRequestBuilder.execute().actionGet().getHits().getTotalHits().value);
client.close();
}
}
@ -189,11 +217,18 @@ class ClientTest {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertNull(client.getBulkController().getLastBulkError());
assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
logger.log(Level.INFO, "refreshing index test");
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0);
assertEquals(maxthreads * actions,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("test")
.source(builder);
SearchResponse searchResponse = client.getClient().execute(SearchAction.INSTANCE, searchRequest).actionGet();
assertEquals(maxthreads * actions, searchResponse.getHits().getTotalHits().value);
client.close();
}
}

View file

@ -4,6 +4,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -27,7 +28,7 @@ class DuplicateIDTest {
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
private static final Long ACTIONS = 50L;
private static final Long ACTIONS = 100L;
private final TestExtension.Helper helper;
@ -51,13 +52,16 @@ class DuplicateIDTest {
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test");
searchRequest.types("test");
searchRequest.source(builder);
long hits = helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("test")
.source(builder);
SearchResponse searchResponse =
helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet();
long hits = searchResponse.getHits().getTotalHits().value;
logger.info("hits = {}", hits);
assertTrue(hits < ACTIONS);
} catch (NoNodeAvailableException e) {

View file

@ -6,10 +6,12 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.util.List;
import java.util.Map;
public class MockNode extends Node {
public MockNode(Settings settings, List<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins);
super(InternalSettingsPreparer.prepareEnvironment(settings, Map.of(), null, () -> "mock"),
classpathPlugins, false);
}
}

View file

@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -22,7 +21,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@ -37,7 +36,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -192,8 +191,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("discovery.zen.master_election.ignore_non_master_pings", "true")
.put("transport.netty.epoll", "false")
.put("cluster.initial_master_nodes", "1")
.put("discovery.seed_hosts", "127.0.0.1:9300")
.build();
}
@ -219,7 +218,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put(getNodeSettings())
.put("node.name", id)
.build();
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);

View file

@ -1,5 +1,6 @@
dependencies {
compile project(':elx-common')
compile "org.xbib.elasticsearch:transport-netty4:${rootProject.property('elasticsearch-server.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-analysis-common:${rootProject.property('elasticsearch-server.version')}"
compile "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
compile "io.netty:netty-codec-http:${project.property('netty.version')}"
compile "io.netty:netty-transport:${project.property('netty.version')}"
}

View file

@ -22,7 +22,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.xbib.elx.common.AbstractExtendedClient;
import org.xbib.elx.common.util.NetworkUtils;

View file

@ -6,11 +6,13 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -85,28 +87,45 @@ class ClientTest {
}
@Test
void testMapping() throws Exception {
void testNewIndexWithSettings() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(helper.getTransportSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
Settings settings = Settings.builder().put("index.number_of_shards", "1").build();
client.newIndex("test", settings);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest()
.indices("test");
GetSettingsResponse getSettingsResponse =
client.getClient().execute(GetSettingsAction.INSTANCE, getSettingsRequest).actionGet();
logger.log(Level.INFO, "settings=" + getSettingsResponse.getSetting("test", "index.number_of_shards"));
assertEquals("1", getSettingsResponse.getSetting("test", "index.number_of_shards"));
client.close();
}
@Test
void testNewIndexWithSettingsAndMapping() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(helper.getTransportSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
Settings settings = Settings.builder().put("index.number_of_shards", "1").build();
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
client.newIndex("test", Settings.EMPTY, Strings.toString(builder));
client.newIndex("test", settings, builder);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test");
GetMappingsResponse getMappingsResponse =
client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc"));
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("_doc"));
client.close();
}
@ -146,7 +165,7 @@ class ClientTest {
searchRequest.source(builder);
SearchResponse searchResponse = client.getClient().execute(SearchAction.INSTANCE, searchRequest).actionGet();
logger.log(Level.INFO, searchResponse.toString());
assertEquals(numactions, searchResponse.getHits().getTotalHits());
assertEquals(numactions, searchResponse.getHits().getTotalHits().value);
client.close();
}
}
@ -207,16 +226,18 @@ class ClientTest {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertNull(client.getBulkController().getLastBulkError());
assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
logger.log(Level.INFO, "refreshing index test");
client.refreshIndex("test");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
builder.size(0);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test");
searchRequest.source(builder);
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("test")
.source(builder);
SearchResponse searchResponse = client.getClient().execute(SearchAction.INSTANCE, searchRequest).actionGet();
logger.log(Level.INFO, searchResponse.toString());
assertEquals(maxthreads * actions, searchResponse.getHits().getTotalHits());
assertEquals(maxthreads * actions, searchResponse.getHits().getTotalHits().value);
client.close();
}
}

View file

@ -28,7 +28,7 @@ class DuplicateIDTest {
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
private static final Long ACTIONS = 5L;
private static final Long ACTIONS = 100L;
private final TestExtension.Helper helper;
@ -53,16 +53,16 @@ class DuplicateIDTest {
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test_dup");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
builder.size(0);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test_dup");
searchRequest.types("test_dup");
searchRequest.source(builder);
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0)
.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("test_dup")
.source(builder);
SearchResponse searchResponse =
helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet();
long hits = searchResponse.getHits().getTotalHits();
long hits = searchResponse.getHits().getTotalHits().value;
logger.info("hits = {}", hits);
assertTrue(hits < ACTIONS);
} catch (NoNodeAvailableException e) {

View file

@ -6,10 +6,12 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.util.List;
import java.util.Map;
public class MockNode extends Node {
public MockNode(Settings settings, List<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins);
super(InternalSettingsPreparer.prepareEnvironment(settings, Map.of(), null, () -> "mock"),
classpathPlugins, false);
}
}

View file

@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -22,7 +21,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@ -37,7 +36,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -192,7 +191,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("discovery.zen.master_election.ignore_non_master_pings", "true")
.put("cluster.initial_master_nodes", "1")
.put("discovery.seed_hosts", "127.0.0.1:9300")
.build();
}
@ -227,7 +227,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put(getNodeSettings())
.put("node.name", id)
.build();
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
List<Class<? extends Plugin>> plugins = Collections.singletonList(Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);

View file

@ -1,19 +1,16 @@
group = org.xbib
name = elx
version = 6.3.2.5
profile = default
release = 0
version = 7.6.1.0
elasticsearch-server.version = 6.3.2.4
log4j.version = 2.12.1
elasticsearch.version = 7.6.1
netty.version = 4.1.49.Final
xbib-netty-http.version = 4.1.49.0
tcnative.version = 2.0.29.Final
tcnative-legacy-macosx.version = 2.0.26.Final
bouncycastle.version = 1.64
xbib-metrics.version = 2.0.0
xbib-netty-http.version = 4.1.49.0
# test
junit.version = 5.4.2
log4j.version = 2.13.1
junit.version = 5.6.2
asciidoclet.version = 1.6.0.0
org.gradle.warning.mode = all