update to elx API of 7.6 branch

This commit is contained in:
Jörg Prante 2020-05-11 23:49:55 +02:00
parent 6702452982
commit 539c77cbd4
32 changed files with 650 additions and 591 deletions

View file

@ -1,12 +1,3 @@
language: java
sudo: required
jdk:
- openjdk11
cache:
directories:
- $HOME/.m2
after_success:
- ./gradlew sonarqube -Dsonar.host.url=https://sonarqube.com -Dsonar.login=$SONAR_TOKEN
env:
global:
secure: n1Ai4q/yMLn/Pg5pA4lTavoJoe7mQYB1PSKnZAqwbgyla94ySzK6iyBCBiNs/foMPisB/x+DHvmUXTsjvquw9Ay48ZITCV3xhcWzD0eZM2TMoG19CpRAEe8L8LNuYiti9k89ijDdUGZ5ifsvQNTGNHksouayAuApC3PrTUejJfR6SYrp1ZsQTbsMlr+4XU3p7QknK5rGgOwATIMP28F+bVnB05WJtlJA3b0SeucCurn3wJ4FGBQXRYmdlT7bQhNE4QgZM1VzcUFD/K0TBxzzq/otb/lNRSifyoekktDmJwQnaT9uQ4R8R6KdQ2Kb38Rvgjur+TKm5i1G8qS2+6LnIxQJG1aw3JvKK6W0wWCgnAVVRrXaCLday9NuY59tuh1mfjQ10UcsMNKcTdcKEMrLow506wSETcXc7L/LEnneWQyJJeV4vhPqR7KJfsBbeqgz3yIfsCn1GZVWFlfegzYCN52YTl0Y0uRD2Z+TnzQu+Bf4DzaWXLge1rz31xkhyeNNspub4h024+XqBjcMm6M9mlMzmmK8t2DIwPy/BlQbFBUyhrxziuR/5/2NEDPyHltvWkRb4AUIa25WJqkV0gTBegbMadZ9DyOo6Ea7aoVFBae2WGR08F1kzABsWrd1S7UJmWxW35iyMEtoAIayXphIK98qO5aCutwZ+3iOQazxbAs=

View file

@ -2,7 +2,11 @@ plugins {
id "org.sonarqube" version "2.8"
id "io.codearte.nexus-staging" version "0.21.1"
id "com.github.spotbugs" version "2.0.1"
id "org.xbib.gradle.plugin.asciidoctor" version "1.5.6.0.1"
id "org.xbib.gradle.plugin.asciidoctor" version "1.6.0.1"
}
if (JavaVersion.current() < JavaVersion.VERSION_11) {
throw new GradleException("This build must be run with Java/OpenJDK 11+")
}
subprojects {
@ -19,10 +23,10 @@ subprojects {
dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-api:${project.property('junit.version')}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.property('junit.version')}"
testImplementation "org.apache.logging.log4j:log4j-core:${project.property('log4j.version')}"
testImplementation "org.apache.logging.log4j:log4j-jul:${project.property('log4j.version')}"
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:${project.property('log4j.version')}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.property('junit.version')}"
asciidoclet "org.xbib:asciidoclet:${project.property('asciidoclet.version')}"
}
@ -46,14 +50,11 @@ subprojects {
test {
enabled = true
useJUnitPlatform()
// we MUST use this hack because of Elasticsearch 2.2.1 Lucene 5.4.1 MMapDirectory unmap() hackery
doFirst {
jvmArgs = [
'--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED'
]
}
jvmArgs = [
'--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED'
]
systemProperty 'java.util.logging.manager', 'org.apache.logging.log4j.jul.LogManager'
systemProperty 'jna.debug_load', 'true'
systemProperty 'path.home', "${project.buildDir}"

View file

@ -14,6 +14,10 @@ public interface BulkController extends Closeable, Flushable {
void init(Settings settings);
void inactivate();
BulkMetric getBulkMetric();
Throwable getLastBulkError();
void startBulkMode(IndexDefinition indexDefinition) throws IOException;
@ -21,16 +25,15 @@ public interface BulkController extends Closeable, Flushable {
void startBulkMode(String indexName, long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException;
void index(IndexRequest indexRequest);
void bulkIndex(IndexRequest indexRequest);
void delete(DeleteRequest deleteRequest);
void bulkDelete(DeleteRequest deleteRequest);
void update(UpdateRequest updateRequest);
void bulkUpdate(UpdateRequest updateRequest);
boolean waitForResponses(long timeout, TimeUnit timeUnit);
boolean waitForBulkResponses(long timeout, TimeUnit timeUnit);
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;
}

View file

@ -0,0 +1,43 @@
package org.xbib.elx.api;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
public interface BulkListener {
/**
* Callback before the bulk is executed.
*
* @param executionId execution ID
* @param request request
*/
void beforeBulk(long executionId, BulkRequest request);
/**
* Callback after a successful execution of bulk request.
*
* @param executionId execution ID
* @param request request
* @param response response
*/
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
/**
* Callback after a failed execution of bulk request.
*
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*
* @param executionId execution ID
* @param request request
* @param failure failure
*/
void afterBulk(long executionId, BulkRequest request, Throwable failure);
/**
* Get the last bulk error.
* @return the last bulk error
*/
Throwable getLastBulkError();
}

View file

@ -1,6 +1,5 @@
package org.xbib.elx.api;
import org.elasticsearch.common.settings.Settings;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@ -8,8 +7,6 @@ import java.io.Closeable;
public interface BulkMetric extends Closeable {
void init(Settings settings);
Metered getTotalIngest();
Count getTotalIngestSizeInBytes();

View file

@ -1,8 +1,6 @@
package org.xbib.elx.api;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import java.io.Closeable;
import java.io.Flushable;
@ -13,54 +11,9 @@ public interface BulkProcessor extends Closeable, Flushable {
@SuppressWarnings("rawtypes")
BulkProcessor add(ActionRequest request);
@SuppressWarnings("rawtypes")
BulkProcessor add(ActionRequest request, Object payload);
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
}
/**
* A listener for the execution.
*/
interface Listener {
/**
* Callback before the bulk is executed.
*
* @param executionId execution ID
* @param request request
*/
void beforeBulk(long executionId, BulkRequest request);
/**
* Callback after a successful execution of bulk request.
*
* @param executionId execution ID
* @param request request
* @param response response
*/
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
/**
* Callback after a failed execution of bulk request.
*
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*
* @param executionId execution ID
* @param request request
* @param failure failure
*/
void afterBulk(long executionId, BulkRequest request, Throwable failure);
}
BulkListener getBulkListener();
}

View file

@ -0,0 +1,11 @@
package org.xbib.elx.api;
import org.elasticsearch.action.bulk.BulkRequest;
import java.util.concurrent.TimeUnit;
public interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
}

View file

@ -6,6 +6,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.Closeable;
import java.io.Flushable;
@ -34,12 +35,6 @@ public interface ExtendedClient extends Flushable, Closeable {
*/
ElasticsearchClient getClient();
/**
* Get bulk metric.
* @return the bulk metric
*/
BulkMetric getBulkMetric();
/**
* Get buulk control.
* @return the bulk control
@ -190,6 +185,8 @@ public interface ExtendedClient extends Flushable, Closeable {
*/
ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException;
ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException;
/**
* Create a new index.
*
@ -199,7 +196,7 @@ public interface ExtendedClient extends Flushable, Closeable {
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException;
ExtendedClient newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException;
/**
* Create a new index.
@ -317,7 +314,7 @@ public interface ExtendedClient extends Flushable, Closeable {
/**
* Force segment merge of an index.
* @param indexDefinition th eindex definition
* @param indexDefinition the index definition
* @return this
*/
boolean forceMerge(IndexDefinition indexDefinition);
@ -398,7 +395,7 @@ public interface ExtendedClient extends Flushable, Closeable {
String resolveMostRecentIndex(String alias);
/**
* Get all aliases.
* Get all index aliases.
* @param index the index
* @return map of index aliases
*/

View file

@ -4,7 +4,7 @@ import java.util.List;
public interface IndexPruneResult {
enum State { NOTHING_TO_DO, SUCCESS, NONE };
enum State { NOTHING_TO_DO, SUCCESS, NONE, FAIL };
State getState();

View file

@ -1,6 +1,5 @@
package org.xbib.elx.api;
@FunctionalInterface
public interface ReadClientProvider<C extends ReadClient> {
C getReadClient();

View file

@ -2,6 +2,7 @@ package org.xbib.elx.common;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
@ -58,6 +59,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -67,7 +69,6 @@ import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.IndexAliasAdder;
import org.xbib.elx.api.IndexDefinition;
@ -85,7 +86,6 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -108,22 +108,11 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private static final Logger logger = LogManager.getLogger(AbstractExtendedClient.class.getName());
/**
* The one and only index type name used in the extended client.
* Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_".
*/
private static final String TYPE_NAME = "doc";
/**
* The Elasticsearch client.
*/
private ElasticsearchClient client;
private BulkMetric bulkMetric;
private BulkController bulkController;
private AtomicBoolean closed;
private final AtomicBoolean closed;
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
@Override
@ -178,11 +167,6 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
return client;
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public BulkController getBulkController() {
return bulkController;
@ -190,15 +174,12 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public AbstractExtendedClient init(Settings settings) throws IOException {
logger.info("initializing with settings = " + settings.toDelimitedString(','));
if (client == null) {
client = createClient(settings);
}
if (bulkMetric == null) {
this.bulkMetric = new DefaultBulkMetric();
this.bulkMetric.init(settings);
}
if (bulkController == null) {
this.bulkController = new DefaultBulkController(this, bulkMetric);
this.bulkController = new DefaultBulkController(this);
bulkController.init(settings);
}
return this;
@ -213,12 +194,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void close() throws IOException {
ensureActive();
ensureClient();
if (closed.compareAndSet(false, true)) {
if (bulkMetric != null) {
logger.info("closing bulk metric");
bulkMetric.close();
}
if (bulkController != null) {
logger.info("closing bulk controller");
bulkController.close();
@ -229,9 +206,9 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getClusterName() {
ensureActive();
ensureClient();
try {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear();
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
return clusterStateResponse.getClusterName().value();
@ -249,7 +226,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException {
ensureActive();
ensureClient();
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
URL indexSettings = indexDefinition.getSettingsUrl();
if (indexSettings == null) {
@ -284,7 +261,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index) throws IOException {
return newIndex(index, Settings.EMPTY, (Map<String, Object>) null);
return newIndex(index, Settings.EMPTY, (Map<String, ?>) null);
}
@Override
@ -296,7 +273,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient newIndex(String index, Settings settings) throws IOException {
return newIndex(index, settings, (Map<String, Object>) null);
return newIndex(index, settings, (Map<String, ?>) null);
}
@Override
@ -306,8 +283,8 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@Override
public ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) {
ensureActive();
public ExtendedClient newIndex(String index, Settings settings, XContentBuilder mapping) {
ensureClient();
if (index == null) {
logger.warn("no index name given to create index");
return this;
@ -317,11 +294,35 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
createIndexRequest.settings(settings);
}
if (mapping != null) {
createIndexRequest.mapping(TYPE_NAME, mapping);
createIndexRequest.mapping("doc", mapping);
}
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
logger.info("index {} created: {}", index, createIndexResponse);
return this;
if (createIndexResponse.isAcknowledged()) {
return this;
}
throw new IllegalStateException("index creation not acknowledged: " + index);
}
@Override
public ExtendedClient newIndex(String index, Settings settings, Map<String, ?> mapping) {
ensureClient();
if (index == null) {
logger.warn("no index name given to create index");
return this;
}
CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
if (settings != null) {
createIndexRequest.settings(settings);
}
if (mapping != null) {
createIndexRequest.mapping("doc", mapping);
}
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
if (createIndexResponse.isAcknowledged()) {
return this;
}
throw new IllegalStateException("index creation not acknowledged: " + index);
}
@Override
@ -331,7 +332,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient deleteIndex(String index) {
ensureActive();
ensureClient();
if (index == null) {
logger.warn("no index name given to delete index");
return this;
@ -351,7 +352,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
throws IOException {
if (bulkController != null) {
ensureActive();
ensureClient();
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
}
return this;
@ -360,7 +361,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException {
if (bulkController != null) {
ensureActive();
ensureClient();
bulkController.stopBulkMode(indexDefinition);
}
return this;
@ -369,7 +370,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException {
if (bulkController != null) {
ensureActive();
ensureClient();
bulkController.stopBulkMode(index, timeout, timeUnit);
}
return this;
@ -377,63 +378,63 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient index(String index, String id, boolean create, String source) {
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
return index(new IndexRequest().index(index).type("doc").id(id).create(create)
.source(source.getBytes(StandardCharsets.UTF_8)));
}
@Override
public ExtendedClient index(String index, String id, boolean create, BytesReference source) {
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
return index(new IndexRequest().index(index).type("doc").id(id).create(create)
.source(source));
}
@Override
public ExtendedClient index(IndexRequest indexRequest) {
ensureActive();
bulkController.index(indexRequest);
ensureClient();
bulkController.bulkIndex(indexRequest);
return this;
}
@Override
public ExtendedClient delete(String index, String id) {
return delete(new DeleteRequest(index, TYPE_NAME, id));
return delete(new DeleteRequest().index(index).type("doc").id(id));
}
@Override
public ExtendedClient delete(DeleteRequest deleteRequest) {
ensureActive();
bulkController.delete(deleteRequest);
ensureClient();
bulkController.bulkDelete(deleteRequest);
return this;
}
@Override
public ExtendedClient update(String index, String id, BytesReference source) {
return update(new UpdateRequest(index, TYPE_NAME, id)
return update(new UpdateRequest().index(index).type("doc").id(id)
.doc(source));
}
@Override
public ExtendedClient update(String index, String id, String source) {
return update(new UpdateRequest(index, TYPE_NAME, id)
return update(new UpdateRequest().index(index).type("doc").id(id)
.doc(source.getBytes(StandardCharsets.UTF_8)));
}
@Override
public ExtendedClient update(UpdateRequest updateRequest) {
ensureActive();
bulkController.update(updateRequest);
ensureClient();
bulkController.bulkUpdate(updateRequest);
return this;
}
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
ensureActive();
return bulkController.waitForResponses(timeout, timeUnit);
ensureClient();
return bulkController.waitForBulkResponses(timeout, timeUnit);
}
@Override
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureClient();
ensureIndexGiven(index);
GetSettingsRequest settingsRequest = new GetSettingsRequest();
settingsRequest.indices(index);
@ -448,7 +449,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
logger.error("timeout waiting for recovery");
logger.warn("timeout waiting for recovery");
return false;
}
}
@ -457,15 +458,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureClient();
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
if (logger.isErrorEnabled()) {
logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
}
logger.warn("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
return false;
}
return true;
@ -473,7 +472,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
ensureActive();
ensureClient();
try {
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
@ -530,7 +529,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient flushIndex(String index) {
if (index != null) {
ensureActive();
ensureClient();
client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet();
}
return this;
@ -539,7 +538,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public ExtendedClient refreshIndex(String index) {
if (index != null) {
ensureActive();
ensureClient();
client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet();
}
return this;
@ -550,7 +549,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (alias == null) {
return null;
}
ensureActive();
ensureClient();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
@ -574,9 +573,13 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public String resolveAlias(String alias) {
ensureActive();
ensureClient();
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.blocks(false);
clusterStateRequest.metaData(true);
clusterStateRequest.nodes(false);
clusterStateRequest.routingTable(false);
clusterStateRequest.customs(false);
ClusterStateResponse clusterStateResponse =
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
SortedMap<String, AliasOrIndex> map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup();
@ -612,7 +615,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public IndexShiftResult shiftIndex(String index, String fullIndexName,
List<String> additionalAliases, IndexAliasAdder adder) {
ensureActive();
ensureClient();
if (index == null) {
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
}
@ -706,11 +709,11 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
if (index.equals(fullIndexName)) {
return EMPTY_INDEX_PRUNE_RESULT;
}
ensureActive();
ensureClient();
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
logger.info("{} indices", getIndexResponse.getIndices().length);
logger.info("pruneIndex: total of {} indices", getIndexResponse.getIndices().length);
List<String> candidateIndices = new ArrayList<>();
for (String s : getIndexResponse.getIndices()) {
Matcher m = pattern.matcher(s);
@ -746,20 +749,29 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest()
.indices(indicesToDelete.toArray(s));
DeleteIndexResponse response = client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
return new SuccessPruneResult(candidateIndices, indicesToDelete, response);
if (response.isAcknowledged()) {
logger.log(Level.INFO, "deletion of {} acknowledged, waiting for GREEN", Arrays.asList(s));
waitForCluster("GREEN", 30L, TimeUnit.SECONDS);
return new SuccessPruneResult(candidateIndices, indicesToDelete, response);
} else {
logger.log(Level.WARN, "deletion of {} not acknowledged", Arrays.asList(s));
return new FailPruneResult(candidateIndices, indicesToDelete, response);
}
}
@Override
public Long mostRecentDocument(String index, String timestampfieldname) {
ensureActive();
SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.field(timestampfieldname);
sourceBuilder.size(1);
sourceBuilder.sort(sort);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.source(sourceBuilder);
ensureClient();
SortBuilder sort = SortBuilders
.fieldSort(timestampfieldname)
.order(SortOrder.DESC);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.sort(sort)
.field(timestampfieldname)
.size(1);
SearchRequest searchRequest = new SearchRequest()
.indices(index)
.source(sourceBuilder);
SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
if (searchResponse.getHits().getHits().length == 1) {
SearchHit hit = searchResponse.getHits().getHits()[0];
@ -837,7 +849,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
@Override
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
ensureActive();
ensureClient();
if (index == null) {
throw new IOException("no index name given");
}
@ -854,7 +866,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
}
private void ensureActive() {
private void ensureClient() {
if (this instanceof MockExtendedClient) {
return;
}
@ -886,7 +898,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
public void checkMapping(String index) {
ensureActive();
ensureClient();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
@ -902,25 +914,24 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private void checkMapping(String index, String type, MappingMetaData mappingMetaData) {
try {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
SearchResponse searchResponse = searchRequestBuilder.setSize(0)
.setIndices(index)
.setTypes(type)
.setQuery(QueryBuilders.matchAllQuery())
.execute()
.actionGet();
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.size(0);
SearchRequest searchRequest = new SearchRequest()
.indices(index)
.source(builder);
SearchResponse searchResponse =
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
long total = searchResponse.getHits().getTotalHits();
if (total > 0L) {
Map<String, Long> fields = new TreeMap<>();
Map<String, Object> root = mappingMetaData.getSourceAsMap();
checkMapping(index, type, "", "", root, fields);
checkMapping(index, "", "", root, fields);
AtomicInteger empty = new AtomicInteger();
Map<String, Long> map = sortByValue(fields);
map.forEach((key, value) -> {
logger.info("{} {} {}",
key,
value,
(double) value * 100 / total);
key, value, (double) value * 100 / total);
if (value == 0) {
empty.incrementAndGet();
}
@ -934,7 +945,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
@SuppressWarnings("unchecked")
private void checkMapping(String index, String type,
private void checkMapping(String index,
String pathDef, String fieldName, Map<String, Object> map,
Map<String, Long> fields) {
String path = pathDef;
@ -959,18 +970,19 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
String fieldType = o instanceof String ? o.toString() : null;
// do not recurse into our custom field mapper
if (!"standardnumber".equals(fieldType) && !"ref".equals(fieldType)) {
checkMapping(index, type, path, key, child, fields);
checkMapping(index, path, key, child, fields);
}
} else if ("type".equals(key)) {
QueryBuilder filterBuilder = QueryBuilders.existsQuery(path);
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(filterBuilder);
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
SearchResponse searchResponse = searchRequestBuilder.setSize(0)
.setIndices(index)
.setTypes(type)
.setQuery(queryBuilder)
.execute()
.actionGet();
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(queryBuilder)
.size(0);
SearchRequest searchRequest = new SearchRequest()
.indices(index)
.source(builder);
SearchResponse searchResponse =
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
fields.put(path, searchResponse.getHits().getTotalHits());
}
}
@ -978,7 +990,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
private static <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V> map) {
Map<K, V> result = new LinkedHashMap<>();
map.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue))
map.entrySet().stream().sorted(Map.Entry.comparingByValue())
.forEachOrdered(e -> result.put(e.getKey(), e.getValue()));
return result;
}
@ -1062,6 +1074,42 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
}
}
private static class FailPruneResult implements IndexPruneResult {
List<String> candidateIndices;
List<String> indicesToDelete;
DeleteIndexResponse response;
FailPruneResult(List<String> candidateIndices, List<String> indicesToDelete,
DeleteIndexResponse response) {
this.candidateIndices = candidateIndices;
this.indicesToDelete = indicesToDelete;
this.response = response;
}
@Override
public IndexPruneResult.State getState() {
return IndexPruneResult.State.FAIL;
}
@Override
public List<String> getCandidateIndices() {
return candidateIndices;
}
@Override
public List<String> getDeletedIndices() {
return indicesToDelete;
}
@Override
public boolean isAcknowledged() {
return response.isAcknowledged();
}
}
private static class NothingToDoPruneResult implements IndexPruneResult {
List<String> candidateIndices;

View file

@ -1,5 +1,8 @@
package org.xbib.elx.common;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
@ -16,6 +19,8 @@ import java.util.ServiceLoader;
@SuppressWarnings("rawtypes")
public class ClientBuilder {
private static final Logger logger = LogManager.getLogger(ClientBuilder.class);
private final ElasticsearchClient client;
private final Settings.Builder settingsBuilder;
@ -97,6 +102,10 @@ public class ClientBuilder {
if (provider == null) {
throw new IllegalArgumentException("no provider");
}
return (C) providerMap.get(provider).getExtendedClient().setClient(client).init(settingsBuilder.build());
Settings settings = settingsBuilder.build();
logger.log(Level.INFO, "settings = " + settings.toDelimitedString(','));
return (C) providerMap.get(provider).getExtendedClient()
.setClient(client)
.init(settings);
}
}

View file

@ -2,9 +2,6 @@ package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -12,6 +9,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.ExtendedClient;
@ -33,27 +31,23 @@ public class DefaultBulkController implements BulkController {
private final BulkMetric bulkMetric;
private BulkProcessor bulkProcessor;
private final List<String> indexNames;
private final Map<String, Long> startBulkRefreshIntervals;
private final Map<String, Long> stopBulkRefreshIntervals;
private long maxWaitTime;
private final long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private final TimeUnit maxWaitTimeUnit;
private BulkProcessor bulkProcessor;
private final AtomicBoolean active;
private BulkListener bulkListener;
private AtomicBoolean active;
private boolean enableBulkLogging;
public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) {
public DefaultBulkController(ExtendedClient client) {
this.client = client;
this.bulkMetric = bulkMetric;
this.bulkMetric = new DefaultBulkMetric();
this.indexNames = new ArrayList<>();
this.active = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
@ -62,9 +56,14 @@ public class DefaultBulkController implements BulkController {
this.maxWaitTimeUnit = TimeUnit.SECONDS;
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public Throwable getLastBulkError() {
return bulkListener.getLastBulkError();
return bulkProcessor.getBulkListener().getLastBulkError();
}
@Override
@ -78,22 +77,27 @@ public class DefaultBulkController implements BulkController {
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
"maxVolumePerRequest"));
this.enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
Parameters.ENABLE_BULK_LOGGING.getValue());
this.bulkListener = new BulkListener();
BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests)
.setFlushInterval(flushIngestInterval)
.setBulkSize(maxVolumePerRequest)
.build();
this.active.set(true);
if (logger.isInfoEnabled()) {
logger.info("bulk processor set up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {}, bulk logging = {}",
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest,
enableBulkLogging);
}
this.active.set(true);
}
@Override
public void inactivate() {
this.active.set(false);
}
@Override
@ -118,65 +122,53 @@ public class DefaultBulkController implements BulkController {
}
@Override
public void index(IndexRequest indexRequest) {
public void bulkIndex(IndexRequest indexRequest) {
ensureActiveAndBulk();
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
}
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
bulkProcessor.add(indexRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of index failed: " + e.getMessage(), e);
}
inactivate();
}
}
@Override
public void delete(DeleteRequest deleteRequest) {
public void bulkDelete(DeleteRequest deleteRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
}
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
bulkProcessor.add(deleteRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of delete failed: " + e.getMessage(), e);
}
inactivate();
}
}
@Override
public void update(UpdateRequest updateRequest) {
public void bulkUpdate(UpdateRequest updateRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
}
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
}
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
bulkProcessor.add(updateRequest);
} catch (Exception e) {
bulkListener.lastBulkError = e;
active.set(false);
if (logger.isErrorEnabled()) {
logger.error("bulk add of update failed: " + e.getMessage(), e);
}
inactivate();
}
}
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) {
try {
return bulkProcessor.awaitFlush(timeout, timeUnit);
} catch (InterruptedException e) {
@ -195,7 +187,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
flush();
if (waitForResponses(timeout, timeUnit)) {
if (waitForBulkResponses(timeout, timeUnit)) {
if (indexNames.contains(index)) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L) {
@ -217,6 +209,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void close() throws IOException {
flush();
bulkMetric.close();
if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
@ -238,92 +231,5 @@ public class DefaultBulkController implements BulkController {
if (bulkProcessor == null) {
throw new UnsupportedOperationException("bulk processor not present");
}
if (bulkListener == null) {
throw new UnsupportedOperationException("bulk listener not present");
}
}
private class BulkListener implements DefaultBulkProcessor.Listener {
private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
private Throwable lastBulkError = null;
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().inc();
int n = request.numberOfActions();
bulkMetric.getSubmitted().inc(n);
bulkMetric.getCurrentIngestNumDocs().inc(n);
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
}
if (enableBulkLogging && logger.isDebugEnabled()) {
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
executionId,
request.numberOfActions(),
request.estimatedSizeInBytes(),
l);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
}
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
}
if (itemResponse.isFailed()) {
n++;
if (bulkMetric != null) {
bulkMetric.getSucceeded().dec(1);
bulkMetric.getFailed().inc(1);
}
}
}
if (enableBulkLogging && bulkMetric != null && logger.isDebugEnabled()) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
executionId,
bulkMetric.getSucceeded().getCount(),
bulkMetric.getFailed().getCount(),
response.getTook().millis(),
l);
}
if (n > 0) {
if (enableBulkLogging && logger.isErrorEnabled()) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
}
} else {
if (bulkMetric != null) {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec();
}
lastBulkError = failure;
active.set(false);
if (enableBulkLogging && logger.isErrorEnabled()) {
logger.error("after bulk [" + executionId + "] error", failure);
}
}
Throwable getLastBulkError() {
return lastBulkError;
}
}
}

View file

@ -0,0 +1,109 @@
package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkMetric;
class DefaultBulkListener implements BulkListener {
private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
private final BulkController bulkController;
private final BulkMetric bulkMetric;
private final boolean isBulkLoggingEnabled;
private Throwable lastBulkError = null;
public DefaultBulkListener(BulkController bulkController,
BulkMetric bulkMetric,
boolean isBulkLoggingEnabled) {
this.bulkController = bulkController;
this.bulkMetric = bulkMetric;
this.isBulkLoggingEnabled = isBulkLoggingEnabled;
}
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().inc();
int n = request.numberOfActions();
bulkMetric.getSubmitted().inc(n);
bulkMetric.getCurrentIngestNumDocs().inc(n);
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
}
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
executionId,
request.numberOfActions(),
request.estimatedSizeInBytes(),
l);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
}
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
}
if (itemResponse.isFailed()) {
n++;
if (bulkMetric != null) {
bulkMetric.getSucceeded().dec(1);
bulkMetric.getFailed().inc(1);
}
}
}
if (isBulkLoggingEnabled && bulkMetric != null && logger.isDebugEnabled()) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
executionId,
bulkMetric.getSucceeded().getCount(),
bulkMetric.getFailed().getCount(),
response.getTook().millis(),
l);
}
if (n > 0) {
if (isBulkLoggingEnabled && logger.isErrorEnabled()) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
}
} else {
if (bulkMetric != null) {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec();
}
lastBulkError = failure;
if (logger.isErrorEnabled()) {
logger.error("after bulk [" + executionId + "] error", failure);
}
bulkController.inactivate();
}
@Override
public Throwable getLastBulkError() {
return lastBulkError;
}
}

View file

@ -1,6 +1,5 @@
package org.xbib.elx.common;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.BulkMetric;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
@ -37,10 +36,6 @@ public class DefaultBulkMetric implements BulkMetric {
submitted = new CountMetric();
succeeded = new CountMetric();
failed = new CountMetric();
}
@Override
public void init(Settings settings) {
start();
}

View file

@ -11,7 +11,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.BulkRequestHandler;
import java.util.Objects;
import java.util.concurrent.Executors;
@ -29,6 +31,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class DefaultBulkProcessor implements BulkProcessor {
private final BulkListener bulkListener;
private final int bulkActions;
private final long bulkSize;
@ -45,16 +49,22 @@ public class DefaultBulkProcessor implements BulkProcessor {
private volatile boolean closed;
private DefaultBulkProcessor(ElasticsearchClient client, Listener listener, String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
private DefaultBulkProcessor(ElasticsearchClient client,
BulkListener bulkListener,
String name,
int concurrentRequests,
int bulkActions,
ByteSizeValue bulkSize,
TimeValue flushInterval) {
this.bulkListener = bulkListener;
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = concurrentRequests == 0 ?
new SyncBulkRequestHandler(client, listener) :
new AsyncBulkRequestHandler(client, listener, concurrentRequests);
new SyncBulkRequestHandler(client, bulkListener) :
new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests);
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
EsExecutors.daemonThreadFactory(name != null ? "[" + name + "]" : "" + "bulk_processor"));
@ -68,12 +78,18 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
}
public static Builder builder(ElasticsearchClient client, Listener listener) {
public static Builder builder(ElasticsearchClient client,
BulkListener listener) {
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null");
return new Builder(client, listener);
}
@Override
public BulkListener getBulkListener() {
return bulkListener;
}
/**
* Wait for bulk request handler with flush.
* @param timeout the timeout value
@ -136,20 +152,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
@SuppressWarnings("rawtypes")
@Override
public DefaultBulkProcessor add(ActionRequest request) {
return add(request, null);
}
/**
* Adds either a delete or an index request with a payload.
*
* @param request request
* @param payload payload
* @return his bulk processor
*/
@SuppressWarnings("rawtypes")
@Override
public DefaultBulkProcessor add(ActionRequest request, Object payload) {
internalAdd(request, payload);
internalAdd(request);
return this;
}
@ -183,9 +186,9 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
}
private synchronized void internalAdd(ActionRequest<?> request, Object payload) {
private synchronized void internalAdd(ActionRequest<?> request) {
ensureOpen();
bulkRequest.add(request, payload);
bulkRequest.add(request);
executeIfNeeded();
}
@ -217,7 +220,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
private final Listener listener;
private final BulkListener bulkListener;
private String name;
@ -233,12 +236,12 @@ public class DefaultBulkProcessor implements BulkProcessor {
* Creates a builder of bulk processor with the client to use and the listener that will be used
* to be notified on the completion of bulk requests.
*
* @param client the client
* @param listener the listener
* @param client the client
* @param bulkListener the listener
*/
Builder(ElasticsearchClient client, Listener listener) {
Builder(ElasticsearchClient client, BulkListener bulkListener) {
this.client = client;
this.listener = listener;
this.bulkListener = bulkListener;
}
/**
@ -308,7 +311,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
* @return a bulk processor
*/
public DefaultBulkProcessor build() {
return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
}
}
@ -332,10 +335,9 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
private final DefaultBulkProcessor.Listener listener;
private final BulkListener listener;
SyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener) {
Objects.requireNonNull(listener, "A listener is required for SyncBulkRequestHandler but null");
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener) {
this.client = client;
this.listener = listener;
}
@ -365,14 +367,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
private final ElasticsearchClient client;
private final DefaultBulkProcessor.Listener listener;
private final BulkListener listener;
private final Semaphore semaphore;
private final int concurrentRequests;
private AsyncBulkRequestHandler(ElasticsearchClient client, DefaultBulkProcessor.Listener listener, int concurrentRequests) {
Objects.requireNonNull(listener, "A listener is required for AsyncBulkRequestHandler but null");
private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener, int concurrentRequests) {
this.client = client;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
@ -413,7 +414,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
} catch (Exception e) {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
if (!bulkRequestSetupSuccessful && acquired) {
// if we fail on client.bulk() release the semaphore
semaphore.release();
}
}

View file

@ -29,7 +29,7 @@ public class MockExtendedClient extends AbstractExtendedClient {
}
@Override
protected void closeClient() {
protected void closeClient() {
}
@Override

View file

@ -52,7 +52,8 @@ class AliasTest {
// get alias
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY);
long t0 = System.nanoTime();
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
GetAliasesResponse getAliasesResponse =
client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
long t1 = (System.nanoTime() - t0) / 1000000;
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
assertTrue(t1 >= 0);

View file

@ -33,7 +33,7 @@ class SearchTest {
ElasticsearchClient client = helper.client("1");
BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE);
for (int i = 0; i < 1; i++) {
IndexRequest indexRequest = new IndexRequest("pages", "row")
IndexRequest indexRequest = new IndexRequest().index("pages").type("row")
.source(XContentFactory.jsonBuilder()
.startObject()
.field("user1", "joerg")

View file

@ -30,7 +30,7 @@ class SimpleTest {
}
@Test
void test() throws Exception {
void testSimple() throws Exception {
try {
DeleteIndexRequest deleteIndexRequest =
new DeleteIndexRequest().indices("test");
@ -39,7 +39,6 @@ class SimpleTest {
// ignore if index not found
}
Settings indexSettings = Settings.settingsBuilder()
.put(helper.getNodeSettings())
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")

View file

@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import java.util.concurrent.atomic.AtomicInteger;
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
@ -47,20 +46,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private String home;
private String cluster;
private String host;
private int port;
private static final String key = "es-instance";
private static final AtomicInteger count = new AtomicInteger(0);
private static final ExtensionContext.Namespace ns =
ExtensionContext.Namespace.create(TestExtension.class);
@ -73,20 +62,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
setHome(System.getProperty("path.home") + "/" + getRandomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create());
// initialize new helper here, increase counter
return extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
logger.info("starting cluster");
public void beforeEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress();
String host = null;
int port = 0;
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
@ -111,42 +103,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
closeNodes();
deleteFiles(Paths.get(getHome() + "/data"));
public void afterEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
closeNodes(helper);
deleteFiles(Paths.get(helper.getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
private void setClusterName(String cluster) {
this.cluster = cluster;
}
private String getClusterName() {
return cluster;
}
private void setHome(String home) {
this.home = home;
}
private String getHome() {
return home;
}
private void closeNodes() {
private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
for (AbstractClient client : helper.clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
@ -168,23 +144,42 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
}
private String getRandomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Helper create() {
return new Helper();
Helper helper = new Helper();
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
helper.setClusterName("test-cluster-" + helper.randomString(8));
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
return helper;
}
class Helper {
static class Helper {
String home;
String cluster;
Map<String, Node> nodes = new HashMap<>();
Map<String, AbstractClient> clients = new HashMap<>();
void setHome(String home) {
this.home = home;
}
String getHome() {
return home;
}
void setClusterName(String cluster) {
this.cluster = cluster;
}
String getClusterName() {
return cluster;
}
Settings getNodeSettings() {
return settingsBuilder()
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.build();
@ -198,20 +193,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return clients.get(id);
}
String randomString(int n) {
return getRandomString(n);
String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
.put("name", id)
.put("node.name", id)
.build();
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
logger.info("clients={}", clients);
return node;
}
}

View file

@ -45,18 +45,19 @@ class WildcardTest {
}
private void index(ElasticsearchClient client, String id, String fieldValue) throws IOException {
client.execute(IndexAction.INSTANCE, new IndexRequest("index", "type", id)
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())).actionGet();
client.execute(IndexAction.INSTANCE, new IndexRequest().index("index").type("type").id(id)
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject()))
.actionGet();
client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet();
}
private long count(ElasticsearchClient client, QueryBuilder queryBuilder) {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(queryBuilder);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("index");
searchRequest.types("type");
searchRequest.source(builder);
SearchSourceBuilder builder = new SearchSourceBuilder()
.query(queryBuilder);
SearchRequest searchRequest = new SearchRequest()
.indices("index")
.types("type")
.source(builder);
return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
}

View file

@ -1 +1 @@
package org.xbib.elx.common.test;
package org.xbib.elx.common.test;

View file

@ -94,7 +94,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@ -122,7 +122,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@ -180,7 +180,7 @@ class ClientTest {
logger.warn("skipping, no node available");
} finally {
client.stopBulk("test5", 60L, TimeUnit.SECONDS);
assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
assertEquals(maxthreads * actions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}

View file

@ -63,7 +63,7 @@ class DuplicateIDTest {
logger.warn("skipping, no node available");
} finally {
client.close();
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}

View file

@ -37,7 +37,7 @@ class SmokeTest {
client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
assertEquals(helper.getCluster(), client.getClusterName());
assertEquals(helper.getClusterName(), client.getClusterName());
client.checkMapping("test_smoke");
client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
client.flush();
@ -54,8 +54,8 @@ class SmokeTest {
int replica = client.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
assertEquals(0, client.getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
assertEquals(0, client.getBulkController().getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkController().getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

View file

@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import java.util.concurrent.atomic.AtomicInteger;
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
@ -47,20 +46,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private String home;
private String cluster;
private String host;
private int port;
private static final String key = "es-instance";
private static final AtomicInteger count = new AtomicInteger(0);
private static final ExtensionContext.Namespace ns =
ExtensionContext.Namespace.create(TestExtension.class);
@ -73,20 +62,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
setHome(System.getProperty("path.home") + "/" + getRandomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create());
// initialize new helper here, increase counter
return extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
Helper helper = context.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
logger.info("starting cluster");
public void beforeEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
.publishAddress();
String host = null;
int port = 0;
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
@ -111,42 +103,26 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
closeNodes();
deleteFiles(Paths.get(getHome() + "/data"));
public void afterEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
closeNodes(helper);
deleteFiles(Paths.get(helper.getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
private void setClusterName(String cluster) {
this.cluster = cluster;
}
private String getClusterName() {
return cluster;
}
private void setHome(String home) {
this.home = home;
}
private String getHome() {
return home;
}
private void closeNodes() {
private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
for (AbstractClient client : helper.clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
@ -168,20 +144,46 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
}
private String getRandomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Helper create() {
return new Helper();
Helper helper = new Helper();
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
helper.setClusterName("test-cluster-" + helper.randomString(8));
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
return helper;
}
class Helper {
static class Helper {
String home;
String cluster;
Map<String, Node> nodes = new HashMap<>();
Map<String, AbstractClient> clients = new HashMap<>();
void setHome(String home) {
this.home = home;
}
String getHome() {
return home;
}
void setClusterName(String cluster) {
this.cluster = cluster;
}
String getClusterName() {
return cluster;
}
Settings getNodeSettings() {
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.build();
}
void startNode(String id) {
buildNode(id).start();
@ -191,25 +193,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return clients.get(id);
}
String getCluster() {
return getClusterName();
}
String randomString(int n) {
return getRandomString(n);
String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.put("name", id)
Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
.put("node.name", id)
.build();
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
logger.info("clients={}", clients);
return node;
}
}

View file

@ -97,7 +97,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@ -126,7 +126,7 @@ class ClientTest {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@ -183,7 +183,7 @@ class ClientTest {
logger.warn("skipping, no node available");
} finally {
client.stopBulk("test5", 60L, TimeUnit.SECONDS);
assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount());
assertEquals(maxthreads * maxloop, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}

View file

@ -64,7 +64,7 @@ class DuplicateIDTest {
logger.warn("skipping, no node available");
} finally {
client.close();
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}

View file

@ -38,7 +38,7 @@ class SmokeTest extends TestExtension {
client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flush();
client.waitForResponses(30, TimeUnit.SECONDS);
assertEquals(helper.getCluster(), client.getClusterName());
assertEquals(helper.getClusterName(), client.getClusterName());
client.checkMapping("test_smoke");
client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
client.flush();
@ -56,8 +56,8 @@ class SmokeTest extends TestExtension {
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
assertEquals(0, client.getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
assertEquals(0, client.getBulkController().getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkController().getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

View file

@ -36,8 +36,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import java.util.concurrent.atomic.AtomicInteger;
public class TestExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
@ -47,19 +46,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private static final String key = "es-instance";
private Map<String, AbstractClient> clients = new HashMap<>();
private String home;
private String cluster;
private String host;
private int port;
private static final String key = "es-test-instance";
private static final AtomicInteger count = new AtomicInteger(0);
private static final ExtensionContext.Namespace ns =
ExtensionContext.Namespace.create(TestExtension.class);
@ -73,15 +62,16 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
setHome(System.getProperty("path.home") + "/" + getRandomString(8));
setClusterName("test-cluster-" + System.getProperty("user.name"));
return extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
// initialize new helper here, increase counter
return extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
}
@Override
public void beforeEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key, key -> create(), Helper.class);
logger.info("starting cluster");
Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
helper.startNode("1");
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
@ -89,8 +79,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.publishAddress();
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
port = address.address().getPort();
helper.host = address.address().getHostName();
helper.port = address.address().getPort();
}
try {
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
@ -107,46 +97,30 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
ClusterStateResponse clusterStateResponse =
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
logger.info("host = {} port = {}", host, port);
logger.info("host = {} port = {}", helper.host, helper.port);
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
closeNodes();
deleteFiles(Paths.get(getHome() + "/data"));
logger.info("data files wiped: " + getHome());
public void afterEach(ExtensionContext extensionContext) throws Exception {
Helper helper = extensionContext.getParent().get().getStore(ns)
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
closeNodes(helper);
deleteFiles(Paths.get(helper.getHome() + "/data"));
logger.info("data files wiped");
Thread.sleep(2000L); // let OS commit changes
}
private void setClusterName(String cluster) {
this.cluster = cluster;
}
private String getClusterName() {
return cluster;
}
private void setHome(String home) {
this.home = home;
}
private String getHome() {
return home;
}
private void closeNodes() {
private void closeNodes(Helper helper) throws IOException {
logger.info("closing all clients");
for (AbstractClient client : clients.values()) {
for (AbstractClient client : helper.clients.values()) {
client.close();
}
clients.clear();
logger.info("closing all nodes");
for (Node node : nodes.values()) {
for (Node node : helper.nodes.values()) {
if (node != null) {
node.close();
}
}
nodes.clear();
logger.info("all nodes closed");
}
@ -168,34 +142,57 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
}
}
private String getRandomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Helper create() {
return new Helper();
Helper helper = new Helper();
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
helper.setClusterName("test-cluster-" + helper.randomString(8));
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
return helper;
}
class Helper {
static class Helper {
String home;
String cluster;
String host;
int port;
Map<String, Node> nodes = new HashMap<>();
Map<String, AbstractClient> clients = new HashMap<>();
void setHome(String home) {
this.home = home;
}
String getHome() {
return home;
}
void setClusterName(String cluster) {
this.cluster = cluster;
}
String getClusterName() {
return cluster;
}
Settings getNodeSettings() {
return settingsBuilder()
return Settings.builder()
.put("cluster.name", getClusterName())
.put("path.home", getHome())
.build();
}
Settings getTransportSettings() {
return settingsBuilder()
return Settings.builder()
.put("cluster.name", cluster)
.put("path.home", getHome())
.put("host", host)
.put("port", port)
.put("cluster.name", getClusterName())
.put("path.home", getHome() + "/transport")
.build();
}
@ -207,24 +204,24 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
return clients.get(id);
}
String getCluster() {
return getClusterName();
}
String randomString(int n) {
return getRandomString(n);
String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
}
return new String(buf);
}
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
Settings nodeSettings = Settings.builder()
.put(getNodeSettings())
.put("name", id)
.put("node.name", id)
.build();
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);
logger.info("clients={}", clients);
return node;
}
}

View file

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 2.2.1.15
version = 2.2.1.16
# main
xbib-metrics.version = 2.0.0
@ -8,7 +8,7 @@ xbib-metrics.version = 2.0.0
xbib-guice.version = 4.4.2
# guava 18 -> our guava 28.1
xbib-guava.version = 28.1
xbib-netty-http.version = 4.1.48.0
xbib-netty-http.version = 4.1.49.0
elasticsearch.version = 2.2.1
#jackson 2.6.7 (original ES version) -> 2.9.10
jackson.version = 2.9.10
@ -17,8 +17,5 @@ log4j.version = 2.12.1
mustache.version = 0.9.5
jts.version = 1.13
# test
junit.version = 5.5.1
# docs
junit.version = 5.6.2
asciidoclet.version = 1.5.4