changes from 6.3 branch, some fixes, better timeouts, better index shift

Jörg Prante 5 years ago
parent 5184b75b36
commit fcd99f801d

@ -1,18 +0,0 @@
dependencies {
compile("org.elasticsearch.client:transport:${rootProject.property('elasticsearch.version')}") {
exclude group: 'org.elasticsearch', module: 'securesm'
exclude group: 'org.elasticsearch.plugin', module: 'transport-netty3-client'
exclude group: 'org.elasticsearch.plugin', module: 'reindex-client'
exclude group: 'org.elasticsearch.plugin', module: 'percolator-client'
exclude group: 'org.elasticsearch.plugin', module: 'lang-mustache-client'
}
// we try to override the Elasticsearch netty by our netty version which might be more recent
compile "io.netty:netty-buffer:${rootProject.property('netty.version')}"
compile "io.netty:netty-codec-http:${rootProject.property('netty.version')}"
compile "io.netty:netty-handler:${rootProject.property('netty.version')}"
}
jar {
baseName "${rootProject.name}-api"
}

@ -10,9 +10,11 @@ import java.util.concurrent.TimeUnit;
public interface BulkProcessor extends Closeable, Flushable {
BulkProcessor add(ActionRequest<?> request);
@SuppressWarnings("rawtype")
BulkProcessor add(ActionRequest request);
BulkProcessor add(ActionRequest<?> request, Object payload);
@SuppressWarnings("rawtype")
BulkProcessor add(ActionRequest request, Object payload);
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;

@ -169,6 +169,16 @@ public interface ExtendedClient extends Flushable, Closeable {
*/
ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param settings settings
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newIndex(String index, Settings settings) throws IOException;
/**
* Create a new index.
*
@ -364,9 +374,11 @@ public interface ExtendedClient extends Flushable, Closeable {
* @param index the index
* @param key the key of the value to be updated
* @param value the new value
* @param timeout timeout
* @param timeUnit time unit
* @throws IOException if update index setting failed
*/
void updateIndexSetting(String index, String key, Object value) throws IOException;
void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException;
/**
* Resolve alias.
@ -386,11 +398,11 @@ public interface ExtendedClient extends Flushable, Closeable {
String resolveMostRecentIndex(String alias);
/**
* Get all index filters.
* Get all aliases.
* @param index the index
* @return map of index filters
* @return map of index aliases
*/
Map<String, String> getIndexFilters(String index);
Map<String, String> getAliases(String index);
/**
* Shift from one index to another.

@ -1,9 +1,9 @@
package org.xbib.elx.api;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@FunctionalInterface
public interface IndexAliasAdder {
void addIndexAlias(IndicesAliasesRequestBuilder builder, String index, String alias);
void addIndexAlias(IndicesAliasesRequest request, String index, String alias);
}

@ -1,4 +1,4 @@
/**
* The API of the Elasticsearch extensions.
* The API of the extended Elasticsearch clients.
*/
package org.xbib.elx.api;

@ -1,8 +1,6 @@
dependencies {
compile project(':elx-api')
compile "org.xbib:guice:${project.property('xbib-guice.version')}"
// add all dependencies to runtime source set, even that which are excluded by Elasticsearch jar,
// for metaprogramming. We are in Groovyland.
runtime "com.vividsolutions:jts:${project.property('jts.version')}"
runtime "com.github.spullara.mustache.java:compiler:${project.property('mustache.version')}"
runtime "net.java.dev.jna:jna:${project.property('jna.version')}"

@ -1,65 +0,0 @@
buildscript {
repositories {
jcenter()
maven {
url 'http://xbib.org/repository'
}
}
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:6.2.2.0"
}
}
apply plugin: 'org.xbib.gradle.plugin.elasticsearch.build'
configurations {
main
tests
}
dependencies {
compile project(':api')
compile "org.xbib:metrics:${project.property('xbib-metrics.version')}"
compileOnly "org.apache.logging.log4j:log4j-api:${project.property('log4j.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
}
jar {
baseName "${rootProject.name}-common"
}
/*
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
}
*/
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
}
test {
enabled = false
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
systemProperty 'path.home', project.buildDir.absolutePath
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
}
}
randomizedTest {
enabled = false
}
esTest {
// test with the jars, not the classes, for security manager
// classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty 'tests.security.manager', 'true'
}
esTest.dependsOn jar, testJar

@ -9,29 +9,29 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequestBuilder;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
@ -46,13 +46,16 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -62,6 +65,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
@ -82,14 +86,15 @@ import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
@ -159,6 +164,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
protected abstract ElasticsearchClient createClient(Settings settings) throws IOException;
protected abstract void closeClient() throws IOException;
protected AbstractExtendedClient() {
closed = new AtomicBoolean(false);
}
@ -166,9 +173,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public AbstractExtendedClient setClient(ElasticsearchClient client) {
this.client = client;
this.bulkMetric = new DefaultBulkMetric();
bulkMetric.start();
this.bulkController = new DefaultBulkController(this, bulkMetric);
return this;
}
@ -192,10 +196,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (client == null) {
client = createClient(settings);
}
if (bulkMetric != null) {
bulkMetric.start();
if (bulkMetric == null) {
this.bulkMetric = new DefaultBulkMetric();
this.bulkMetric.init(settings);
}
if (bulkController != null) {
if (bulkController == null) {
this.bulkController = new DefaultBulkController(this, bulkMetric);
bulkController.init(settings);
}
return this;
@ -213,14 +219,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
ensureActive();
if (closed.compareAndSet(false, true)) {
if (bulkMetric != null) {
logger.info("closing bulk metric before bulk controller (for precise measurement)");
logger.info("closing bulk metric");
bulkMetric.close();
}
if (bulkController != null) {
logger.info("closing bulk controller");
bulkController.close();
}
logger.info("shutdown complete");
closeClient();
}
}
@ -228,9 +234,9 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public String getClusterName() {
ensureActive();
try {
ClusterStateRequestBuilder clusterStateRequestBuilder =
new ClusterStateRequestBuilder(client, ClusterStateAction.INSTANCE).all();
ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value();
} catch (ElasticsearchTimeoutException e) {
logger.warn(e.getMessage(), e);
@ -280,7 +286,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
public ExtendedClient newIndex(String index) {
public ExtendedClient newIndex(String index) throws IOException {
return newIndex(index, Settings.EMPTY, (Map<String, Object>) null);
}
@ -288,31 +294,35 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException {
return newIndex(index,
Settings.settingsBuilder().loadFromStream(".json", settings).build(),
JsonXContent.jsonXContent.createParser(mapping).mapOrdered());
mapping != null ? JsonXContent.jsonXContent.createParser(mapping).mapOrdered() : null);
}
@Override
public ExtendedClient newIndex(String index, Settings settings) throws IOException {
return newIndex(index, settings, (Map<String, Object>) null);
}
@Override
public ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException {
return newIndex(index, settings,
JsonXContent.jsonXContent.createParser(mapping).mapOrdered());
mapping != null ? JsonXContent.jsonXContent.createParser(mapping).mapOrdered() : null);
}
@Override
public ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) {
public ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException {
ensureActive();
if (index == null) {
logger.warn("no index name given to create index");
return this;
}
CreateIndexRequestBuilder createIndexRequestBuilder =
new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE).setIndex(index);
CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
if (settings != null) {
createIndexRequestBuilder.setSettings(settings);
createIndexRequest.settings(settings);
}
if (mapping != null) {
createIndexRequestBuilder.addMapping(TYPE_NAME, mapping);
createIndexRequest.mapping(TYPE_NAME, mapping);
}
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
logger.info("index {} created: {}", index, createIndexResponse);
return this;
}
@ -329,9 +339,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
logger.warn("no index name given to delete index");
return this;
}
DeleteIndexRequestBuilder deleteIndexRequestBuilder =
new DeleteIndexRequestBuilder(client, DeleteIndexAction.INSTANCE, index);
deleteIndexRequestBuilder.execute().actionGet();
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
return this;
}
@ -371,12 +380,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient index(String index, String id, boolean create, BytesReference source) {
return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source));
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
.source(source));
}
@Override
public ExtendedClient index(String index, String id, boolean create, String source) {
return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source.getBytes(StandardCharsets.UTF_8)));
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
.source(source.getBytes(StandardCharsets.UTF_8)));
}
@Override
@ -425,12 +436,18 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureIndexGiven(index);
RecoveryResponse response = client.execute(RecoveryAction.INSTANCE, new RecoveryRequest(index)).actionGet();
RecoveryRequest recoveryRequest = new RecoveryRequest();
recoveryRequest.indices(index);
recoveryRequest.activeOnly(true);
RecoveryResponse response = client.execute(RecoveryAction.INSTANCE, recoveryRequest).actionGet();
int shards = response.getTotalShards();
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.indices(new String[]{index})
.waitForActiveShards(shards)
.timeout(timeout);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest(index)
.waitForActiveShards(shards).timeout(timeout)).actionGet();
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
logger.error("timeout waiting for recovery");
return false;
@ -485,7 +502,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public ExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException {
waitForCluster("YELLOW", maxWaitTime, timeUnit); // let cluster settle down from critical operations
if (level > 0) {
updateIndexSetting(index, "number_of_replicas", level);
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
waitForRecovery(index, maxWaitTime, timeUnit);
}
return this;
@ -528,25 +545,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return this;
}
@Override
public String resolveAlias(String alias) {
ensureActive();
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE);
GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet();
if (!getAliasesResponse.getAliases().isEmpty()) {
return getAliasesResponse.getAliases().keys().iterator().next().value;
}
return alias;
}
@Override
public String resolveMostRecentIndex(String alias) {
ensureActive();
if (alias == null) {
return null;
}
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE);
GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
Set<String> indices = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
@ -558,10 +564,24 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return indices.isEmpty() ? alias : indices.iterator().next();
}
public Map<String, String> getAliases(String index) {
if (index == null) {
return Collections.emptyMap();
}
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().indices(index);
return getFilters(client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet());
}
@Override
public Map<String, String> getIndexFilters(String index) {
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client, GetAliasesAction.INSTANCE);
return getFilters(getAliasesRequestBuilder.setIndices(index).execute().actionGet());
public String resolveAlias(String alias) {
ensureActive();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.metaData(true);
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
SortedMap<String, AliasOrIndex> map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup();
AliasOrIndex aliasOrIndex = map.get(alias);
return aliasOrIndex != null ? aliasOrIndex.getIndices().iterator().next().getIndex() : null;
}
@Override
@ -593,60 +613,81 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public IndexShiftResult shiftIndex(String index, String fullIndexName,
List<String> additionalAliases, IndexAliasAdder adder) {
ensureActive();
if (index == null) {
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
}
if (index.equals(fullIndexName)) {
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
}
// two situations: 1. there is a new alias 2. there is already an old index with the alias
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
// two situations: 1. a new alias 2. there is already an old index with the alias
String oldIndex = resolveAlias(index);
final Map<String, String> oldFilterMap = oldIndex.equals(index) ? null : getIndexFilters(oldIndex);
final List<String> newAliases = new LinkedList<>();
final List<String> moveAliases = new LinkedList<>();
IndicesAliasesRequestBuilder requestBuilder = new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE);
if (oldFilterMap == null || !oldFilterMap.containsKey(index)) {
// never apply a filter for trunk index name
requestBuilder.addAlias(fullIndexName, index);
Map<String, String> oldAliasMap = index.equals(oldIndex) ? null : getAliases(oldIndex);
logger.debug("old index = {} old alias map = {}", oldIndex, oldAliasMap);
final List<String> newAliases = new ArrayList<>();
final List<String> moveAliases = new ArrayList<>();
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
if (oldAliasMap == null || !oldAliasMap.containsKey(index)) {
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, index));
newAliases.add(index);
}
// move existing aliases
if (oldFilterMap != null) {
for (Map.Entry<String, String> entry : oldFilterMap.entrySet()) {
if (oldAliasMap != null) {
for (Map.Entry<String, String> entry : oldAliasMap.entrySet()) {
String alias = entry.getKey();
String filter = entry.getValue();
requestBuilder.removeAlias(oldIndex, alias);
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.REMOVE,
oldIndex, alias));
if (filter != null) {
requestBuilder.addAlias(fullIndexName, alias, filter);
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, index).filter(filter));
} else {
requestBuilder.addAlias(fullIndexName, alias);
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, index));
}
moveAliases.add(alias);
}
}
// a list of aliases that should be added, check if new or old
if (additionalAliases != null) {
for (String extraAlias : additionalAliases) {
if (oldFilterMap == null || !oldFilterMap.containsKey(extraAlias)) {
for (String additionalAlias : additionalAliases) {
if (oldAliasMap == null || !oldAliasMap.containsKey(additionalAlias)) {
// index alias adder only active on extra aliases, and if alias is new
if (adder != null) {
adder.addIndexAlias(requestBuilder, fullIndexName, extraAlias);
adder.addIndexAlias(indicesAliasesRequest, fullIndexName, additionalAlias);
} else {
requestBuilder.addAlias(fullIndexName, extraAlias);
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, additionalAlias));
}
newAliases.add(extraAlias);
newAliases.add(additionalAlias);
} else {
String filter = oldFilterMap.get(extraAlias);
requestBuilder.removeAlias(oldIndex, extraAlias);
String filter = oldAliasMap.get(additionalAlias);
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.REMOVE,
oldIndex, additionalAlias));
if (filter != null) {
requestBuilder.addAlias(fullIndexName, extraAlias, filter);
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, additionalAlias).filter(filter));
} else {
requestBuilder.addAlias(fullIndexName, extraAlias);
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
fullIndexName, additionalAlias));
}
moveAliases.add(extraAlias);
moveAliases.add(additionalAlias);
}
}
}
if (!newAliases.isEmpty() || !moveAliases.isEmpty()) {
logger.info("new aliases = {}, moved aliases = {}", newAliases, moveAliases);
requestBuilder.execute().actionGet();
if (!indicesAliasesRequest.getAliasActions().isEmpty()) {
StringBuilder sb = new StringBuilder();
for (IndicesAliasesRequest.AliasActions aliasActions : indicesAliasesRequest.getAliasActions()) {
sb.append("[").append(aliasActions.actionType().name())
.append(",indices=").append(Arrays.asList(aliasActions.indices()))
.append(",aliases=").append(Arrays.asList(aliasActions.aliases())).append("]");
}
logger.debug("indices alias request = {}", sb.toString());
IndicesAliasesResponse indicesAliasesResponse =
client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet();
logger.debug("response isAcknowledged = {}",
indicesAliasesResponse.isAcknowledged());
}
return new SuccessIndexShiftResult(moveAliases, newAliases);
}
@ -711,13 +752,15 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public Long mostRecentDocument(String index, String timestampfieldname) {
ensureActive();
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SearchResponse searchResponse = searchRequestBuilder.setIndices(index)
.addField(timestampfieldname)
.setSize(1)
.addSort(sort)
.execute().actionGet();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.field(timestampfieldname);
sourceBuilder.size(1);
sourceBuilder.sort(sort);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
if (searchResponse.getHits().getHits().length == 1) {
SearchHit hit = searchResponse.getHits().getHits()[0];
if (hit.getFields().get(timestampfieldname) != null) {
@ -741,11 +784,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) {
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ForceMergeRequestBuilder forceMergeRequestBuilder =
new ForceMergeRequestBuilder(client, ForceMergeAction.INSTANCE);
forceMergeRequestBuilder.setIndices(index);
ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
forceMergeRequest.indices(index);
try {
forceMergeRequestBuilder.execute().get(timeout.getMillis(), TimeUnit.MILLISECONDS);
client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).get(timeout.getMillis(), TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
logger.error("timeout");
@ -794,7 +836,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
public void updateIndexSetting(String index, String key, Object value) throws IOException {
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
ensureActive();
if (index == null) {
throw new IOException("no index name given");
@ -808,7 +850,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
Settings.Builder updateSettingsBuilder = Settings.settingsBuilder();
updateSettingsBuilder.put(key, value.toString());
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
.settings(updateSettingsBuilder);
.settings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
}
@ -845,9 +887,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public void checkMapping(String index) {
ensureActive();
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
.setIndices(index);
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
ImmutableOpenMap<String, MappingMetaData> mappings = map.get(stringObjectCursor.value);
@ -930,7 +971,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
.setQuery(queryBuilder)
.execute()
.actionGet();
fields.put(path, searchResponse.getHits().totalHits());
fields.put(path, searchResponse.getHits().getTotalHits());
}
}
}

@ -107,7 +107,8 @@ public class DefaultBulkController implements BulkController {
startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds);
stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds);
if (startRefreshIntervalInSeconds != 0L) {
client.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s");
client.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s",
30L, TimeUnit.SECONDS);
}
}
}
@ -193,7 +194,8 @@ public class DefaultBulkController implements BulkController {
if (indexNames.contains(index)) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L) {
client.updateIndexSetting(index, "refresh_interval", secs + "s");
client.updateIndexSetting(index, "refresh_interval", secs + "s",
30L, TimeUnit.SECONDS);
}
indexNames.remove(index);
}
@ -214,7 +216,8 @@ public class DefaultBulkController implements BulkController {
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L)
client.updateIndexSetting(index, "refresh_interval", secs + "s");
client.updateIndexSetting(index, "refresh_interval", secs + "s",
30L, TimeUnit.SECONDS);
}
indexNames.clear();
}

@ -49,7 +49,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytes();
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = concurrentRequests == 0 ?
new SyncBulkRequestHandler(client, listener) :
@ -133,7 +133,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
* @return his bulk processor
*/
@Override
public DefaultBulkProcessor add(ActionRequest<?> request) {
public DefaultBulkProcessor add(ActionRequest request) {
return add(request, null);
}
@ -145,7 +145,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
* @return his bulk processor
*/
@Override
public DefaultBulkProcessor add(ActionRequest<?> request, Object payload) {
public DefaultBulkProcessor add(ActionRequest request, Object payload) {
internalAdd(request, payload);
return this;
}
@ -176,7 +176,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private void ensureOpen() {
if (closed) {
throw new IllegalStateException("bulk process already closed");
throw new IllegalStateException("bulk processor already closed");
}
}

@ -6,10 +6,11 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Mock client, it does not perform any actions on a cluster. Useful for testing.
* A mocked client, it does not perform any actions on a cluster. Useful for testing.
*/
public class MockExtendedClient extends AbstractExtendedClient {
@ -28,6 +29,10 @@ public class MockExtendedClient extends AbstractExtendedClient {
return null;
}
@Override
protected void closeClient() {
}
@Override
public MockExtendedClient index(String index, String id, boolean create, String source) {
return this;

@ -1,34 +0,0 @@
package org.elasticsearch.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList;
import java.util.Collection;
public class MockNode extends Node {
public MockNode() {
super(Settings.EMPTY);
}
public MockNode(Settings settings) {
super(settings);
}
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins);
}
public MockNode(Settings settings, Class<? extends Plugin> classpathPlugin) {
this(settings, list(classpathPlugin));
}
private static Collection<Class<? extends Plugin>> list(Class<? extends Plugin> classpathPlugin) {
Collection<Class<? extends Plugin>> list = new ArrayList<>();
list.add(classpathPlugin);
return list;
}
}

@ -1,57 +0,0 @@
package org.xbib.elx.common;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.junit.Assert.assertEquals;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.junit.Test;
public class SimpleTest extends NodeTestUtils {
protected Settings getNodeSettings() {
return settingsBuilder()
.put(super.getNodeSettings())
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
.build();
}
@Test
public void test() throws Exception {
try {
DeleteIndexRequestBuilder deleteIndexRequestBuilder =
new DeleteIndexRequestBuilder(client("1"), DeleteIndexAction.INSTANCE, "test");
deleteIndexRequestBuilder.execute().actionGet();
} catch (IndexNotFoundException e) {
// ignore if index not found
}
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(client("1"), IndexAction.INSTANCE);
indexRequestBuilder
.setIndex("test")
.setType("test")
.setId("1")
.setSource(jsonBuilder().startObject().field("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject())
.setRefresh(true)
.execute()
.actionGet();
String doc = client("1").prepareSearch("test")
.setTypes("test")
.setQuery(matchQuery("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8"))
.execute()
.actionGet()
.getHits().getAt(0).getSourceAsString();
assertEquals(doc,
"{\"field\":\"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8\"}");
}
}

@ -1,4 +1,4 @@
package org.xbib.elx.common;
package org.xbib.elx.common.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
@ -27,7 +26,7 @@ import java.util.regex.Pattern;
/**
*
*/
public class AliasTest extends NodeTestUtils {
public class AliasTest extends TestBase {
private static final Logger logger = LogManager.getLogger(AliasTest.class.getName());
@ -71,9 +70,9 @@ public class AliasTest extends NodeTestUtils {
indicesAliasesRequest.addAliasAction(aliasAction);
client.admin().indices().aliases(indicesAliasesRequest).actionGet();
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client,
GetAliasesAction.INSTANCE);
GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest();
getAliasesRequest.aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
Set<String> result = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {

@ -1,4 +1,4 @@
package org.xbib.elx.node;
package org.xbib.elx.common.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -10,17 +10,16 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class ClusterBlockTest extends NodeTestUtils {
public class ClusterBlockTest extends TestBase {
private static final Logger logger = LogManager.getLogger("test");
@Before
public void startNodes() {
try {
setClusterName();
setClusterName("test-cluster" + System.getProperty("user.name"));
startNode("1");
// do not wait for green health state
logger.info("ready");

@ -1,6 +1,9 @@
package org.xbib.elx.common;
package org.xbib.elx.common.test;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.MockExtendedClient;
import org.xbib.elx.common.MockExtendedClientProvider;
import java.io.IOException;

@ -0,0 +1,12 @@
package org.xbib.elx.common.test;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
public class MockNode extends Node {
public MockNode(Settings settings) {
super(settings);
}
}

@ -1,4 +1,4 @@
package org.xbib.elx.common;
package org.xbib.elx.common.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -15,7 +15,7 @@ public class NetworkTest {
@Test
public void testNetwork() throws Exception {
// walk very slowly over all interfaces
// walk over all found interfaces (this is slow - multicast/pings are performed)
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface netint : Collections.list(nets)) {
System.out.println("checking network interface = " + netint.getName());

@ -1,4 +1,4 @@
package org.xbib.elx.common;
package org.xbib.elx.common.test;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkAction;
@ -15,7 +15,7 @@ import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class SearchTest extends NodeTestUtils {
public class SearchTest extends TestBase {
@Test
public void testSearch() throws Exception {
@ -35,7 +35,8 @@ public class SearchTest extends NodeTestUtils {
.field("user8", "joerg")
.field("user9", "joerg")
.field("rowcount", i)
.field("rs", 1234));
.field("rs", 1234)
.endObject());
builder.add(indexRequest);
}
client.bulk(builder.request()).actionGet();

@ -0,0 +1,61 @@
package org.xbib.elx.common.test;
import static org.junit.Assert.assertEquals;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
public class SimpleTest extends TestBase {
@Test
public void test() throws Exception {
try {
DeleteIndexRequest deleteIndexRequest =
new DeleteIndexRequest().indices("test");
client("1").execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
} catch (IndexNotFoundException e) {
// ignore if index not found
}
Settings indexSettings = Settings.settingsBuilder()
.put(super.getNodeSettings())
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
.build();
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index("test").settings(indexSettings);
client("1").execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
IndexRequest indexRequest = new IndexRequest();
indexRequest.index("test").type("test").id("1")
.source(XContentFactory.jsonBuilder().startObject().field("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject());
client("1").execute(IndexAction.INSTANCE, indexRequest).actionGet();
RefreshRequest refreshRequest = new RefreshRequest();
refreshRequest.indices("test");
client("1").execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchQuery("field",
"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8"));
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test").types("test");
searchRequest.source(builder);
String doc = client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet()
.getHits().getAt(0).getSourceAsString();
assertEquals(doc,
"{\"field\":\"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8\"}");
}
}

@ -1,6 +1,4 @@
package org.xbib.elx.node;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
package org.xbib.elx.common.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -10,16 +8,17 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.junit.After;
import org.junit.Before;
import org.xbib.elx.common.util.NetworkUtils;
import java.io.IOException;
import java.nio.file.FileVisitResult;
@ -31,47 +30,34 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class NodeTestUtils {
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class TestBase {
private static final Logger logger = LogManager.getLogger("test");
private static Random random = new Random();
private static final Random random = new Random();
private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private AtomicInteger counter = new AtomicInteger();
private String cluster;
protected String clusterName;
private String host;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
private int port;
@Before
public void startNodes() {
try {
logger.info("starting");
setClusterName();
setClusterName("test-cluster-" + System.getProperty("user.name"));
startNode("1");
findNodeAddress();
try {
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
@ -83,6 +69,12 @@ public class NodeTestUtils {
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
logger.info("host = {} port = {}", host, port);
} catch (Throwable t) {
logger.error("startNodes failed", t);
}
@ -107,24 +99,34 @@ public class NodeTestUtils {
}
}
protected void setClusterName() {
this.clusterName = "test-helper-cluster-"
+ NetworkUtils.getLocalAddress().getHostName()
+ "-" + System.getProperty("user.name")
+ "-" + counter.incrementAndGet();
protected void setClusterName(String cluster) {
this.cluster = cluster;
}
protected String getClusterName() {
return cluster;
}
protected Settings getTransportSettings() {
return settingsBuilder()
.put("host", host)
.put("port", port)
.put("cluster.name", cluster)
.put("path.home", getHome())
.build();
}
protected Settings getNodeSettings() {
return settingsBuilder()
.put("cluster.name", clusterName)
.put("cluster.routing.schedule", "50ms")
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", true)
.put("discovery.zen.multicast.ping_timeout", "5s")
.put("http.enabled", true)
.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
.put("index.number_of_replicas", 0)
.put("cluster.name", cluster)
//.put("cluster.routing.schedule", "50ms")
//.put("cluster.routing.allocation.disk.threshold_enabled", false)
//.put("discovery.zen.multicast.enabled", true)
//.put("discovery.zen.multicast.ping_timeout", "5s")
//.put("http.enabled", true)
//.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
//.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
//.put("index.number_of_replicas", 0)
.put("path.home", getHome())
.build();
}
@ -133,28 +135,24 @@ public class NodeTestUtils {
return System.getProperty("path.home", System.getProperty("user.dir"));
}
public void startNode(String id) {
protected void startNode(String id) {
buildNode(id).start();
}
public AbstractClient client(String id) {
protected AbstractClient client(String id) {
return clients.get(id);
}
private void closeNodes() {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
node.close();
}
protected void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress();
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
port = address.address().getPort();
}
nodes.clear();
logger.info("all nodes closed");
}
private Node buildNode(String id) {
@ -162,7 +160,6 @@ public class NodeTestUtils {
.put(getNodeSettings())
.put("name", id)
.build();
logger.info("settings={}", nodeSettings.getAsMap());
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
@ -179,4 +176,37 @@ public class NodeTestUtils {
}
return new String(buf);
}
private void closeNodes() {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
}

@ -1,18 +1,23 @@
package org.xbib.elx.common;
package org.xbib.elx.common.test;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import java.io.IOException;
public class WildcardTest extends NodeTestUtils {
public class WildcardTest extends TestBase {
protected Settings getNodeSettings() {
/*protected Settings getNodeSettings() {
return Settings.settingsBuilder()
.put(super.getNodeSettings())
.put("cluster.routing.allocation.disk.threshold_enabled", false)
@ -21,7 +26,7 @@ public class WildcardTest extends NodeTestUtils {
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
}
}*/
@Test
public void testWildcard() throws Exception {
@ -42,15 +47,19 @@ public class WildcardTest extends NodeTestUtils {
}
private void index(Client client, String id, String fieldValue) throws IOException {
client.index(new IndexRequest("index", "type", id)
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())
.refresh(true)).actionGet();
client.execute(IndexAction.INSTANCE, new IndexRequest("index", "type", id)
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())).actionGet();
client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet();
}
private long count(Client client, QueryBuilder queryBuilder) {
return client.prepareSearch("index").setTypes("type")
.setQuery(queryBuilder)
.execute().actionGet().getHits().getTotalHits();
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(queryBuilder);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("index");
searchRequest.types("type");
searchRequest.source(builder);
return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
}
private void validateCount(Client client, QueryBuilder queryBuilder, long expectedHits) {

@ -1,65 +0,0 @@
buildscript {
repositories {
jcenter()
maven {
url 'http://xbib.org/repository'
}
}
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:6.2.2.0"
}
}
apply plugin: 'org.xbib.gradle.plugin.elasticsearch.build'
configurations {
main
tests
}
dependencies {
compile project(':common')
compile "org.xbib:netty-http-client:${project.property('xbib-netty-http-client.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
}
jar {
baseName "${rootProject.name}-common"
}
/*
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
}
*/
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
}
test {
enabled = true
include '**/SimpleTest.*'
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
}
}
randomizedTest {
enabled = false
}
esTest {
enabled = true
// test with the jars, not the classes, for security manager
// classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty 'tests.security.manager', 'true'
// maybe we like some extra security policy for our code
systemProperty 'tests.security.policy', '/extra-security.policy'
}
esTest.dependsOn jar, testJar

@ -1,65 +0,0 @@
buildscript {
repositories {
jcenter()
maven {
url 'http://xbib.org/repository'
}
}
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:6.2.3.4"
}
}
apply plugin: 'org.xbib.gradle.plugin.elasticsearch.build'
configurations {
main
tests
}
dependencies {
compile project(':common')
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
}
jar {
baseName "${rootProject.name}-node"
}
/*
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
}
*/
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
}
test {
enabled = false
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
systemProperty 'path.home', projectDir.absolutePath
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
}
}
randomizedTest {
enabled = false
}
esTest {
// test with the jars, not the classes, for security manager
// classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty 'tests.security.manager', 'true'
// maybe we like some extra security policy for our code
systemProperty 'tests.security.policy', '/extra-security.policy'
}
esTest.dependsOn jar, testJar

@ -34,7 +34,7 @@ public class ExtendedNodeClient extends AbstractExtendedClient {
.put("node.data", false)
.build();
logger.info("creating node client on {} with effective settings {}",
version, effectiveSettings.toString());
version, effectiveSettings.getAsMap());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
try {
@ -48,15 +48,10 @@ public class ExtendedNodeClient extends AbstractExtendedClient {
}
@Override
public void close() throws IOException {
super.close();
try {
if (node != null) {
logger.debug("closing node...");
node.close();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
protected void closeClient() {
if (node != null) {
logger.debug("closing node...");
node.close();
}
}

@ -1,30 +0,0 @@
package org.elasticsearch.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList;
import java.util.Collection;
public class MockNode extends Node {
public MockNode(Settings settings) {
super(settings);
}
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins);
}
public MockNode(Settings settings, Class<? extends Plugin> classpathPlugin) {
this(settings, list(classpathPlugin));
}
private static Collection<Class<? extends Plugin>> list(Class<? extends Plugin> classpathPlugin) {
Collection<Class<? extends Plugin>> list = new ArrayList<>();
list.add(classpathPlugin);
return list;
}
}

@ -1,77 +0,0 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Ignore
public class IndexShiftTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getSimpleName());
@Test
public void testIndexShift() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.build();
try {
client.newIndex("test1234");
for (int i = 0; i < 1; i++) {
client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.refreshIndex("test1234");
List<String> simpleAliases = Arrays.asList("a", "b", "c");
client.shiftIndex("test", "test1234", simpleAliases);
client.newIndex("test5678");
for (int i = 0; i < 1; i++) {
client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.refreshIndex("test5678");
simpleAliases = Arrays.asList("d", "e", "f");
client.shiftIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
Map<String, String> indexFilters = client.getIndexFilters("test5678");
logger.info("aliases of index test5678 = {}", indexFilters);
assertTrue(indexFilters.containsKey("a"));
assertTrue(indexFilters.containsKey("b"));
assertTrue(indexFilters.containsKey("c"));
assertTrue(indexFilters.containsKey("d"));
assertTrue(indexFilters.containsKey("e"));
Map<String, String> aliases = client.getIndexFilters(client.resolveAlias("test"));
logger.info("aliases of alias test = {}", aliases);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.waitForResponses(30L, TimeUnit.SECONDS);
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -1,4 +1,4 @@
package org.xbib.elx.node;
package org.xbib.elx.node.test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
@ -22,14 +22,16 @@ import org.junit.Before;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.ExtendedNodeClient;
import org.xbib.elx.node.ExtendedNodeClientProvider;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ClientTest extends NodeTestUtils {
public class ClientTest extends TestBase {
private static final Logger logger = LogManager.getLogger(ClientTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ClientTest.class.getName());
private static final Long ACTIONS = 25000L;
@ -144,7 +146,7 @@ public class ClientTest extends NodeTestUtils {
logger.info("NodeClient max={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions);
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads * 2)
.put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads)
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();

@ -1,23 +1,25 @@
package org.xbib.elx.node;
package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Ignore;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.ExtendedNodeClient;
import org.xbib.elx.node.ExtendedNodeClientProvider;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
public class DuplicateIDTest extends NodeTestUtils {
public class DuplicateIDTest extends TestBase {
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
@ -38,11 +40,13 @@ public class DuplicateIDTest extends NodeTestUtils {
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test")
.setTypes("test")
.setQuery(matchAllQuery());
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test");
searchRequest.types("test");
searchRequest.source(builder);
long hits = client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
logger.info("hits = {}", hits);
assertTrue(hits < ACTIONS);
} catch (NoNodeAvailableException e) {

@ -0,0 +1,111 @@
package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.xbib.elx.api.IndexShiftResult;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.node.ExtendedNodeClient;
import org.xbib.elx.node.ExtendedNodeClientProvider;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class IndexShiftTest extends TestBase {
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getName());
@Test
public void testIndexShift() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.build();
try {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
client.newIndex("test1234", settings);
for (int i = 0; i < 1; i++) {
client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
IndexShiftResult indexShiftResult =
client.shiftIndex("test", "test1234", Arrays.asList("a", "b", "c"));
assertTrue(indexShiftResult.getNewAliases().contains("a"));
assertTrue(indexShiftResult.getNewAliases().contains("b"));
assertTrue(indexShiftResult.getNewAliases().contains("c"));
assertTrue(indexShiftResult.getMovedAliases().isEmpty());
Map<String, String> aliases = client.getAliases("test1234");
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
String resolved = client.resolveAlias("test");
aliases = client.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
client.newIndex("test5678", settings);
for (int i = 0; i < 1; i++) {
client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
indexShiftResult = client.shiftIndex("test", "test5678", Arrays.asList("d", "e", "f"),
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
);
assertTrue(indexShiftResult.getNewAliases().contains("d"));
assertTrue(indexShiftResult.getNewAliases().contains("e"));
assertTrue(indexShiftResult.getNewAliases().contains("f"));
assertTrue(indexShiftResult.getMovedAliases().contains("a"));
assertTrue(indexShiftResult.getMovedAliases().contains("b"));
assertTrue(indexShiftResult.getMovedAliases().contains("c"));
aliases = client.getAliases("test5678");
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = client.resolveAlias("test");
aliases = client.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.waitForResponses(30L, TimeUnit.SECONDS);
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
assertNull(client.getBulkController().getLastBulkError());
}
}
}

@ -0,0 +1,12 @@
package org.xbib.elx.node.test;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
public class MockNode extends Node {
public MockNode(Settings settings) {
super(settings);
}
}

@ -1,4 +1,4 @@
package org.xbib.elx.node;
package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -16,6 +16,8 @@ import org.elasticsearch.index.indexing.IndexingStats;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.node.ExtendedNodeClient;
import org.xbib.elx.node.ExtendedNodeClientProvider;
import java.util.HashMap;
import java.util.Map;
@ -26,9 +28,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@Ignore
public class ReplicaTest extends NodeTestUtils {
public class ReplicaTest extends TestBase {
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getName());
@Test
public void testReplicaLevel() throws Exception {

@ -1,4 +1,4 @@
package org.xbib.elx.node;
package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -7,15 +7,17 @@ import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.node.ExtendedNodeClient;
import org.xbib.elx.node.ExtendedNodeClientProvider;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class SmokeTest extends NodeTestUtils {
public class SmokeTest extends TestBase {
private static final Logger logger = LogManager.getLogger(SmokeTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(SmokeTest.class.getName());
@Test
public void smokeTest() throws Exception {
@ -28,7 +30,7 @@ public class SmokeTest extends NodeTestUtils {
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
assertEquals(clusterName, client.getClusterName());
assertEquals(getClusterName(), client.getClusterName());
client.checkMapping("test");

@ -1,4 +1,4 @@
package org.xbib.elx.common;
package org.xbib.elx.node.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -8,16 +8,17 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.junit.After;
import org.junit.Before;
import org.xbib.elx.common.util.NetworkUtils;
import java.io.IOException;
import java.nio.file.FileVisitResult;
@ -29,54 +30,32 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class NodeTestUtils {
public class TestBase {
private static final Logger logger = LogManager.getLogger("test");
private static Random random = new Random();
private static final Random random = new Random();
private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private AtomicInteger counter = new AtomicInteger();
private String cluster;
private String host;
private int port;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
@Before
public void startNodes() {
try {
logger.info("starting");
setClusterName();
setClusterName("test-cluster-" + System.getProperty("user.name"));
startNode("1");
findNodeAddress();
try {
@ -90,6 +69,12 @@ public class NodeTestUtils {
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
logger.info("host = {} port = {}", host, port);
} catch (Throwable t) {
logger.error("startNodes failed", t);
}
@ -114,18 +99,15 @@ public class NodeTestUtils {
}
}
protected void setClusterName() {
this.cluster = "test-helper-cluster-"
+ NetworkUtils.getLocalAddress().getHostName()
+ "-" + System.getProperty("user.name")
+ "-" + counter.incrementAndGet();
protected void setClusterName(String cluster) {
this.cluster = cluster;
}
protected String getClusterName() {
return cluster;
}
protected Settings getSettings() {
protected Settings getTransportSettings() {
return settingsBuilder()
.put("host", host)
.put("port", port)
@ -137,14 +119,14 @@ public class NodeTestUtils {
protected Settings getNodeSettings() {
return settingsBuilder()
.put("cluster.name", cluster)
.put("cluster.routing.schedule", "50ms")
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", true)
.put("discovery.zen.multicast.ping_timeout", "5s")
.put("http.enabled", true)
.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
.put("index.number_of_replicas", 0)
//.put("cluster.routing.schedule", "50ms")
//.put("cluster.routing.allocation.disk.threshold_enabled", false)
//.put("discovery.zen.multicast.enabled", true)
//.put("discovery.zen.multicast.ping_timeout", "5s")
//.put("http.enabled", true)
//.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
//.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
//.put("index.number_of_replicas", 0)
.put("path.home", getHome())
.build();
}
@ -153,30 +135,14 @@ public class NodeTestUtils {
return System.getProperty("path.home", System.getProperty("user.dir"));
}
public void startNode(String id) {
protected void startNode(String id) {
buildNode(id).start();
}
public AbstractClient client(String id) {
protected AbstractClient client(String id) {
return clients.get(id);
}
private void closeNodes() {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
protected void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
@ -210,4 +176,37 @@ public class NodeTestUtils {
}
return new String(buf);
}
private void closeNodes() {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
}

@ -1,63 +0,0 @@
buildscript {
repositories {
jcenter()
maven {
url 'http://xbib.org/repository'
}
}
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:6.2.2.0"
}
}
apply plugin: 'org.xbib.gradle.plugin.elasticsearch.build'
configurations {
main
tests
}
dependencies {
compile project(':common')
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${project.property('elasticsearch-devkit.version')}"
}
jar {
baseName "${rootProject.name}-transport"
}
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
}
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
}
esTest {
enabled = true
// test with the jars, not the classes, for security manager
classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty 'tests.security.manager', 'true'
// maybe we like some extra security policy for our code
systemProperty 'tests.security.policy', '/extra-security.policy'
}
esTest.dependsOn jar, testJar
randomizedTest {
enabled = false
}
test {
enabled = false
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
systemProperty 'path.home', projectDir.absolutePath
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
}
}

@ -57,6 +57,15 @@ public class ExtendedTransportClient extends AbstractExtendedClient {
return null;
}
@Override
protected void closeClient() {
if (getClient() != null) {
TransportClient client = (TransportClient) getClient();
client.close();
client.threadPool().shutdown();
}
}
@Override
public ExtendedTransportClient init(Settings settings) throws IOException {
super.init(settings);
@ -73,18 +82,6 @@ public class ExtendedTransportClient extends AbstractExtendedClient {
return this;
}
@Override
public synchronized void close() throws IOException {
super.close();
logger.info("closing");
if (getClient() != null) {
TransportClient client = (TransportClient) getClient();
client.close();
client.threadPool().shutdown();
}
logger.info("close completed");
}
private Collection<InetSocketTransportAddress> findAddresses(Settings settings) throws IOException {
final int defaultPort = settings.getAsInt("port", 9300);
Collection<InetSocketTransportAddress> addresses = new ArrayList<>();

@ -1,34 +0,0 @@
package org.elasticsearch.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList;
import java.util.Collection;
public class MockNode extends Node {
public MockNode() {
super(Settings.EMPTY);
}
public MockNode(Settings settings) {
super(settings);
}
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins);
}
public MockNode(Settings settings, Class<? extends Plugin> classpathPlugin) {
this(settings, list(classpathPlugin));
}
private static Collection<Class<? extends Plugin>> list(Class<? extends Plugin> classpathPlugin) {
Collection<Class<? extends Plugin>> list = new ArrayList<>();
list.add(classpathPlugin);
return list;
}
}

@ -28,9 +28,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ClientTest extends NodeTestUtils {
public class ClientTest extends TestBase {
private static final Logger logger = LogManager.getLogger(ClientTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ClientTest.class.getName());
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
@ -50,7 +50,7 @@ public class ClientTest extends NodeTestUtils {
public void testClientIndexOp() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();
client.newIndex("test");
@ -69,7 +69,7 @@ public class ClientTest extends NodeTestUtils {
public void testSingleDoc() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();
@ -94,7 +94,7 @@ public class ClientTest extends NodeTestUtils {
public void testMapping() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
XContentBuilder builder = jsonBuilder()
@ -121,7 +121,7 @@ public class ClientTest extends NodeTestUtils {
long numactions = ACTIONS;
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();
@ -157,7 +157,7 @@ public class ClientTest extends NodeTestUtils {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxactions)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();

@ -3,22 +3,23 @@ package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class DuplicateIDTest extends NodeTestUtils {
public class DuplicateIDTest extends TestBase {
private final static Logger logger = LogManager.getLogger(DuplicateIDTest.class.getSimpleName());
private final static Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
private final static Long MAX_ACTIONS_PER_REQUEST = 1000L;
@ -29,22 +30,24 @@ public class DuplicateIDTest extends NodeTestUtils {
long numactions = ACTIONS;
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
.build();
try {
client.newIndex("test");
for (int i = 0; i < ACTIONS; i++) {
client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test")
.setTypes("test")
.setQuery(matchAllQuery());
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test");
searchRequest.types("test");
searchRequest.source(builder);
long hits = client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
logger.info("hits = {}", hits);
assertTrue(hits < ACTIONS);
} catch (NoNodeAvailableException e) {

@ -2,66 +2,99 @@ package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.xbib.elx.api.IndexShiftResult;
import org.xbib.elx.common.ClientBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class IndexShiftTest extends NodeTestUtils {
public class IndexShiftTest extends TestBase {
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getName());
@Test
public void testIndexAlias() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings()).build();
.put(getTransportSettings()).build();
try {
client.newIndex("test1234");
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
client.newIndex("test1234", settings);
for (int i = 0; i < 1; i++) {
client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.refreshIndex("test1234");
client.waitForResponses(30L, TimeUnit.SECONDS);
IndexShiftResult indexShiftResult =
client.shiftIndex("test", "test1234", Arrays.asList("a", "b", "c"));
assertTrue(indexShiftResult.getNewAliases().contains("a"));
assertTrue(indexShiftResult.getNewAliases().contains("b"));
assertTrue(indexShiftResult.getNewAliases().contains("c"));
assertTrue(indexShiftResult.getMovedAliases().isEmpty());
List<String> simpleAliases = Arrays.asList("a", "b", "c");
client.shiftIndex("test", "test1234", simpleAliases);
Map<String, String> aliases = client.getAliases("test1234");
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
String resolved = client.resolveAlias("test");
aliases = client.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("test"));
client.newIndex("test5678");
client.newIndex("test5678", settings);
for (int i = 0; i < 1; i++) {
client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flush();
client.refreshIndex("test5678");
client.waitForResponses(30L, TimeUnit.SECONDS);
simpleAliases = Arrays.asList("d", "e", "f");
client.shiftIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
Map<String, String> indexFilters = client.getIndexFilters("test5678");
logger.info("index filters of index test5678 = {}", indexFilters);
assertTrue(indexFilters.containsKey("a"));
assertTrue(indexFilters.containsKey("b"));
assertTrue(indexFilters.containsKey("c"));
assertTrue(indexFilters.containsKey("d"));
assertTrue(indexFilters.containsKey("e"));
indexShiftResult = client.shiftIndex("test", "test5678", Arrays.asList("d", "e", "f"),
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
);
assertTrue(indexShiftResult.getNewAliases().contains("d"));
assertTrue(indexShiftResult.getNewAliases().contains("e"));
assertTrue(indexShiftResult.getNewAliases().contains("f"));
assertTrue(indexShiftResult.getMovedAliases().contains("a"));
assertTrue(indexShiftResult.getMovedAliases().contains("b"));
assertTrue(indexShiftResult.getMovedAliases().contains("c"));
Map<String, String> aliases = client.getIndexFilters(client.resolveAlias("test"));
logger.info("aliases of alias test = {}", aliases);
aliases = client.getAliases("test5678");
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
resolved = client.resolveAlias("test");
aliases = client.getAliases(resolved);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
assertTrue(aliases.containsKey("f"));
client.waitForResponses(30L, TimeUnit.SECONDS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

@ -0,0 +1,11 @@
package org.xbib.elx.transport;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
public class MockNode extends Node {
public MockNode(Settings settings) {
super(settings);
}
}

@ -24,9 +24,9 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ReplicaTest extends NodeTestUtils {
public class ReplicaTest extends TestBase {
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(ReplicaTest.class.getName());
@Test
public void testReplicaLevel() throws Exception {
@ -48,7 +48,7 @@ public class ReplicaTest extends NodeTestUtils {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.build();
try {
@ -119,7 +119,7 @@ public class ReplicaTest extends NodeTestUtils {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.build();
Settings settings = Settings.settingsBuilder()

@ -3,7 +3,9 @@ package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.common.ClientBuilder;
import java.util.concurrent.TimeUnit;
@ -11,25 +13,51 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class SmokeTest extends NodeTestUtils {
public class SmokeTest extends TestBase {
private static final Logger logger = LogManager.getLogger(SmokeTest.class.getSimpleName());
private static final Logger logger = LogManager.getLogger(SmokeTest.class.getName());
@Test
public void testSingleDocNodeClient() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(getTransportSettings())
.build();
try {
client.newIndex("test");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
assertEquals(getClusterName(), client.getClusterName());
client.checkMapping("test");
client.update("test", "1", "{ \"name\" : \"Another name\"}");
client.flush();
client.waitForRecovery("test", 10L, TimeUnit.SECONDS);
client.delete("test", "1");
client.deleteIndex("test");
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test2", Settings.settingsBuilder()
.build());
assertEquals(0, indexDefinition.getReplicaLevel());
client.newIndex(indexDefinition);
client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
client.flush();
client.updateReplicaLevel(indexDefinition, 2);
int replica = client.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
assertEquals(0, client.getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
client.close();
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());

@ -8,16 +8,17 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.junit.After;
import org.junit.Before;
import org.xbib.elx.common.util.NetworkUtils;
import java.io.IOException;
import java.nio.file.FileVisitResult;
@ -29,54 +30,32 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class NodeTestUtils {
public class TestBase {
private static final Logger logger = LogManager.getLogger("test");
private static Random random = new Random();
private static final Random random = new Random();
private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private AtomicInteger counter = new AtomicInteger();
private String cluster;
private String host;
private int port;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
@Before
public void startNodes() {
try {
logger.info("starting");
setClusterName();
setClusterName("test-cluster-" + System.getProperty("user.name"));
startNode("1");
findNodeAddress();
try {
@ -90,6 +69,12 @@ public class NodeTestUtils {
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
}
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateResponse clusterStateResponse =
client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
logger.info("host = {} port = {}", host, port);
} catch (Throwable t) {
logger.error("startNodes failed", t);
}
@ -114,18 +99,15 @@ public class NodeTestUtils {
}
}
protected void setClusterName() {
this.cluster = "test-helper-cluster-"
+ NetworkUtils.getLocalAddress().getHostName()
+ "-" + System.getProperty("user.name")
+ "-" + counter.incrementAndGet();
protected void setClusterName(String cluster) {
this.cluster = cluster;
}
protected String getClusterName() {
return cluster;
}
protected Settings getSettings() {
protected Settings getTransportSettings() {
return settingsBuilder()
.put("host", host)
.put("port", port)
@ -137,14 +119,14 @@ public class NodeTestUtils {
protected Settings getNodeSettings() {
return settingsBuilder()
.put("cluster.name", cluster)
.put("cluster.routing.schedule", "50ms")
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", true)
.put("discovery.zen.multicast.ping_timeout", "5s")
.put("http.enabled", true)
.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
.put("index.number_of_replicas", 0)
//.put("cluster.routing.schedule", "50ms")
//.put("cluster.routing.allocation.disk.threshold_enabled", false)
//.put("discovery.zen.multicast.enabled", true)
//.put("discovery.zen.multicast.ping_timeout", "5s")
//.put("http.enabled", true)
//.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
//.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
//.put("index.number_of_replicas", 0)
.put("path.home", getHome())
.build();
}
@ -153,30 +135,14 @@ public class NodeTestUtils {
return System.getProperty("path.home", System.getProperty("user.dir"));
}
public void startNode(String id) {
protected void startNode(String id) {
buildNode(id).start();
}
public AbstractClient client(String id) {
protected AbstractClient client(String id) {
return clients.get(id);
}
private void closeNodes() {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
protected void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
@ -194,7 +160,6 @@ public class NodeTestUtils {
.put(getNodeSettings())
.put("name", id)
.build();
logger.info("settings={}", nodeSettings.getAsMap());
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
@ -211,4 +176,37 @@ public class NodeTestUtils {
}
return new String(buf);
}
private void closeNodes() {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
}

@ -6,7 +6,7 @@
</Console>
</appenders>
<Loggers>
<Root level="info">
<Root level="debug">
<AppenderRef ref="Console" />
</Root>
</Loggers>

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 2.2.1.5
version = 2.2.1.6
xbib-metrics.version = 1.1.0
xbib-guice.version = 4.0.4

@ -1,104 +0,0 @@
task xbibUpload(type: Upload) {
configuration = configurations.archives
uploadDescriptor = true
repositories {
if (project.hasProperty("xbibUsername")) {
mavenDeployer {
configuration = configurations.wagon
repository(url: 'scpexe://xbib.org/repository') {
authentication(userName: xbibUsername, privateKey: xbibPrivateKey)
}
}
}
}
}
task sonaTypeUpload(type: Upload) {
configuration = configurations.archives
uploadDescriptor = true
repositories {
if (project.hasProperty('ossrhUsername')) {
mavenDeployer {
beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') {
authentication(userName: ossrhUsername, password: ossrhPassword)
}
snapshotRepository(url: 'https://oss.sonatype.org/content/repositories/snapshots') {
authentication(userName: ossrhUsername, password: ossrhPassword)
}
pom.project {
name name
description description
packaging 'jar'
inceptionYear '2012'
url scmUrl
organization {
name 'xbib'
url 'http://xbib.org'
}
developers {
developer {
id user
name 'Jörg Prante'
email 'joergprante@gmail.com'
url 'https://github.com/jprante'
}
}
scm {
url scmUrl
connection scmConnection
developerConnection scmDeveloperConnection
}
licenses {
license {
name 'The Apache License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
}
}
}
}
}
task hbzUpload(type: Upload) {
configuration = configurations.archives
uploadDescriptor = true
repositories {
if (project.hasProperty('hbzUserName')) {
mavenDeployer {
configuration = configurations.wagon
beforeDeployment { MavenDeployment deployment ->
signing.signPom(deployment)
}
repository(url: uri(hbzUrl)) {
authentication(userName: hbzUserName, privateKey: hbzPrivateKey)
}
pom.project {
developers {
developer {
id 'jprante'
name 'Jörg Prante'
email 'joergprante@gmail.com'
url 'https://github.com/jprante'
}
}
scm {
url 'https://github.com/xbib/elasticsearch-webapp-libraryservice'
connection 'scm:git:git://github.com/xbib/elasticsaerch-webapp-libraryservice.git'
developerConnection 'scm:git:git://github.com/xbib/elasticsaerch-webapp-libraryservice.git'
}
inceptionYear '2016'
licenses {
license {
name 'The Apache License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
}
}
}
}
}
Loading…
Cancel
Save