update to latest api
This commit is contained in:
parent
610e170778
commit
bca70b7dd3
51 changed files with 772 additions and 567 deletions
|
@ -26,6 +26,8 @@ public interface AdminClient extends BasicClient {
|
|||
|
||||
Map<String, ?> getMapping(String index, String type) throws IOException;
|
||||
|
||||
void checkMapping(String index);
|
||||
|
||||
/**
|
||||
* Delete an index.
|
||||
* @param indexDefinition the index definition
|
||||
|
|
|
@ -13,12 +13,6 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public interface BulkClient extends BasicClient, Flushable {
|
||||
|
||||
/**
|
||||
* Get bulk metric.
|
||||
* @return the bulk metric
|
||||
*/
|
||||
BulkMetric getBulkMetric();
|
||||
|
||||
/**
|
||||
* Get buulk control.
|
||||
* @return the bulk control
|
||||
|
|
|
@ -36,4 +36,5 @@ public interface BulkController extends Closeable, Flushable {
|
|||
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
|
||||
|
||||
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ public interface BulkMetric extends Closeable {
|
|||
|
||||
void init(Settings settings);
|
||||
|
||||
void markTotalIngest(long n);
|
||||
|
||||
Metered getTotalIngest();
|
||||
|
||||
Count getTotalIngestSizeInBytes();
|
||||
|
|
|
@ -8,8 +8,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public interface BulkProcessor extends Closeable, Flushable {
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
BulkProcessor add(ActionRequest request);
|
||||
BulkProcessor add(ActionRequest<?> request);
|
||||
|
||||
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import java.util.Collection;
|
|||
|
||||
public interface IndexPruneResult {
|
||||
|
||||
enum State { NOTHING_TO_DO, SUCCESS, NONE, FAIL };
|
||||
enum State { NOTHING_TO_DO, SUCCESS, NONE };
|
||||
|
||||
State getState();
|
||||
|
||||
|
|
|
@ -14,6 +14,8 @@ import java.util.stream.Stream;
|
|||
|
||||
public interface SearchClient extends BasicClient {
|
||||
|
||||
SearchMetric getSearchMetric();
|
||||
|
||||
Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder);
|
||||
|
||||
Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilder);
|
||||
|
|
29
elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java
Normal file
29
elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java
Normal file
|
@ -0,0 +1,29 @@
|
|||
package org.xbib.elx.api;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
import java.io.Closeable;
|
||||
|
||||
public interface SearchMetric extends Closeable {
|
||||
|
||||
void init(Settings settings);
|
||||
|
||||
void markTotalQueries(long n);
|
||||
|
||||
Metered getTotalQueries();
|
||||
|
||||
Count getCurrentQueries();
|
||||
|
||||
Count getQueries();
|
||||
|
||||
Count getSucceededQueries();
|
||||
|
||||
Count getEmptyQueries();
|
||||
|
||||
long elapsed();
|
||||
|
||||
void start();
|
||||
|
||||
void stop();
|
||||
}
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
|
|||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.cluster.metadata.AliasAction;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
|
@ -365,7 +366,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
|
||||
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
|
||||
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
|
||||
logger.info("{} indices", getIndexResponse.getIndices().length);
|
||||
logger.info("found {} indices", getIndexResponse.getIndices().length);
|
||||
List<String> candidateIndices = new ArrayList<>();
|
||||
for (String s : getIndexResponse.getIndices()) {
|
||||
Matcher m = pattern.matcher(s);
|
||||
|
@ -459,7 +460,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
@Override
|
||||
public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings)
|
||||
throws IOException {
|
||||
boolean isEnabled = settings.getAsBoolean("enabled", !(client instanceof MockAdminClient));
|
||||
boolean isEnabled = settings.getAsBoolean("enabled", false);
|
||||
String indexName = settings.get("name", index);
|
||||
String fullIndexName;
|
||||
String dateTimePattern = settings.get("dateTimePattern");
|
||||
|
@ -504,6 +505,22 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkMapping(String index) {
|
||||
ensureClientIsPresent();
|
||||
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
|
||||
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
|
||||
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
|
||||
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
|
||||
ImmutableOpenMap<String, MappingMetaData> mappings = map.get(stringObjectCursor.value);
|
||||
for (ObjectObjectCursor<String, MappingMetaData> cursor : mappings) {
|
||||
String mappingName = cursor.key;
|
||||
MappingMetaData mappingMetaData = cursor.value;
|
||||
checkMapping(index, mappingName, mappingMetaData);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static String findSettingsFrom(String string) throws IOException {
|
||||
if (string == null) {
|
||||
return null;
|
||||
|
@ -563,32 +580,14 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
return result;
|
||||
}
|
||||
|
||||
public void checkMapping(String index) {
|
||||
ensureClientIsPresent();
|
||||
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
|
||||
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
|
||||
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
|
||||
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
|
||||
ImmutableOpenMap<String, MappingMetaData> mappings = map.get(stringObjectCursor.value);
|
||||
for (ObjectObjectCursor<String, MappingMetaData> cursor : mappings) {
|
||||
String mappingName = cursor.key;
|
||||
MappingMetaData mappingMetaData = cursor.value;
|
||||
checkMapping(index, mappingName, mappingMetaData);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void checkMapping(String index, String type, MappingMetaData mappingMetaData) {
|
||||
try {
|
||||
SearchSourceBuilder builder = new SearchSourceBuilder();
|
||||
builder.query(QueryBuilders.matchAllQuery());
|
||||
builder.size(0);
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices(index);
|
||||
searchRequest.types(type);
|
||||
searchRequest.source(builder);
|
||||
SearchResponse searchResponse =
|
||||
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
||||
.setIndices(index)
|
||||
.setTypes(type)
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
.setSize(0);
|
||||
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
|
||||
long total = searchResponse.getHits().getTotalHits();
|
||||
if (total > 0L) {
|
||||
Map<String, Long> fields = new TreeMap<>();
|
||||
|
@ -644,15 +643,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
} else if ("type".equals(key)) {
|
||||
QueryBuilder filterBuilder = QueryBuilders.existsQuery(path);
|
||||
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(filterBuilder);
|
||||
SearchSourceBuilder builder = new SearchSourceBuilder();
|
||||
builder.query(queryBuilder);
|
||||
builder.size(0);
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices(index);
|
||||
searchRequest.types(type);
|
||||
searchRequest.source(builder);
|
||||
SearchResponse searchResponse =
|
||||
client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
||||
.setIndices(index)
|
||||
.setTypes(type)
|
||||
.setQuery(queryBuilder)
|
||||
.setSize(0);
|
||||
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
|
||||
fields.put(path, searchResponse.getHits().getTotalHits());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
|
||||
@Override
|
||||
public void setClient(ElasticsearchClient client) {
|
||||
logger.log(Level.INFO, "setting client = " + client);
|
||||
logger.log(Level.DEBUG, "setting client = " + client);
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
|
@ -65,8 +65,6 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
|
||||
this.settings = settings;
|
||||
setClient(createClient(settings));
|
||||
} else {
|
||||
logger.log(Level.WARN, "not initializing");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,23 +93,24 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
ensureClientIsPresent();
|
||||
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet();
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
.timeout(timeout)
|
||||
.waitForStatus(status);
|
||||
ClusterHealthResponse healthResponse =
|
||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name();
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error(message);
|
||||
}
|
||||
logger.error(message);
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
||||
ensureClientIsPresent();
|
||||
logger.info("waiting for cluster shard settling");
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
//.waitForActiveShards(0)
|
||||
.waitForRelocatingShards(0)
|
||||
.timeout(timeout);
|
||||
ClusterHealthResponse healthResponse =
|
||||
|
@ -194,9 +193,6 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
}
|
||||
|
||||
protected void ensureClientIsPresent() {
|
||||
if (this instanceof MockAdminClient) {
|
||||
return;
|
||||
}
|
||||
if (client == null) {
|
||||
throw new IllegalStateException("no client");
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ import org.apache.logging.log4j.Level;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushAction;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
|
@ -17,12 +17,9 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.xbib.elx.api.BulkClient;
|
||||
import org.xbib.elx.api.BulkController;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -34,8 +31,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName());
|
||||
|
||||
private BulkMetric bulkMetric;
|
||||
|
||||
private BulkController bulkController;
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(true);
|
||||
|
@ -44,21 +39,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
public void init(Settings settings) throws IOException {
|
||||
if (closed.compareAndSet(true, false)) {
|
||||
super.init(settings);
|
||||
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
|
||||
bulkMetric = new DefaultBulkMetric();
|
||||
bulkMetric.init(settings);
|
||||
bulkController = new DefaultBulkController(this, bulkMetric);
|
||||
bulkController = new DefaultBulkController(this);
|
||||
logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(','));
|
||||
bulkController.init(settings);
|
||||
} else {
|
||||
logger.log(Level.WARN, "not initializing");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkMetric getBulkMetric() {
|
||||
return bulkMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkController getBulkController() {
|
||||
return bulkController;
|
||||
|
@ -75,10 +61,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
public void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
ensureClientIsPresent();
|
||||
if (bulkMetric != null) {
|
||||
logger.info("closing bulk metric");
|
||||
bulkMetric.close();
|
||||
}
|
||||
if (bulkController != null) {
|
||||
logger.info("closing bulk controller");
|
||||
bulkController.close();
|
||||
|
@ -98,40 +80,50 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public void newIndex(String index) throws IOException {
|
||||
newIndex(index, Settings.EMPTY, (Map<String, ?>) null);
|
||||
newIndex(index, Settings.EMPTY, (XContentBuilder) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings) throws IOException {
|
||||
newIndex(index, settings, (Map<String, ?>) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
|
||||
String mappingString = builder.string();
|
||||
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(mappingString).mapOrdered();
|
||||
newIndex(index, settings, mappings);
|
||||
newIndex(index, settings, (XContentBuilder) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
|
||||
if (mapping == null || mapping.isEmpty()) {
|
||||
newIndex(index, settings, (XContentBuilder) null);
|
||||
} else {
|
||||
newIndex(index, settings, JsonXContent.contentBuilder().map(mapping));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
|
||||
if (index == null) {
|
||||
logger.warn("no index name given to create index");
|
||||
logger.warn("unable to create index, no index name given");
|
||||
return;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
|
||||
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
|
||||
.setIndex(index);
|
||||
if (settings != null) {
|
||||
createIndexRequest.settings(settings);
|
||||
createIndexRequestBuilder.setSettings(settings);
|
||||
}
|
||||
if (mapping != null) {
|
||||
createIndexRequest.mapping(TYPE_NAME, mapping);
|
||||
if (builder != null) {
|
||||
createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
|
||||
logger.debug("adding mapping = {}", builder.string());
|
||||
} else {
|
||||
// empty mapping
|
||||
createIndexRequestBuilder.addMapping(TYPE_NAME,
|
||||
JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject());
|
||||
logger.debug("empty mapping");
|
||||
}
|
||||
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
|
||||
if (createIndexResponse.isAcknowledged()) {
|
||||
logger.info("index {} created", index);
|
||||
} else {
|
||||
logger.warn("index creation of {} not acknowledged", index);
|
||||
}
|
||||
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
logger.info("index {} created: {}", index,
|
||||
createIndexResponse.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -142,36 +134,40 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
@Override
|
||||
public void startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
|
||||
throws IOException {
|
||||
if (bulkController != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopBulk(IndexDefinition indexDefinition) throws IOException {
|
||||
if (bulkController != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.stopBulkMode(indexDefinition);
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
bulkController.stopBulkMode(indexDefinition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException {
|
||||
if (bulkController != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.stopBulkMode(index, timeout, timeUnit);
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
bulkController.stopBulkMode(index, timeout, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkClient index(String index, String id, boolean create, String source) {
|
||||
return index(index, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
|
||||
return index(new IndexRequest()
|
||||
.index(index)
|
||||
.type(TYPE_NAME)
|
||||
.id(id)
|
||||
.create(create)
|
||||
.source(source)); // will be converted into a bytes reference
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkClient index(String index, String id, boolean create, BytesReference source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source));
|
||||
return index(new IndexRequest()
|
||||
.index(index)
|
||||
.type(TYPE_NAME)
|
||||
.id(id)
|
||||
.create(create)
|
||||
.source(source));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -183,7 +179,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public BulkClient delete(String index, String id) {
|
||||
return delete(new DeleteRequest(index, TYPE_NAME, id));
|
||||
return delete(new DeleteRequest()
|
||||
.index(index)
|
||||
.type(TYPE_NAME)
|
||||
.id(id));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -194,15 +193,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public BulkClient update(String index, String id, BytesReference source) {
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id)
|
||||
.doc(source, XContentType.JSON));
|
||||
public BulkClient update(String index, String id, String source) {
|
||||
return update(index, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkClient update(String index, String id, String source) {
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id)
|
||||
.doc(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON));
|
||||
public BulkClient update(String index, String id, BytesReference source) {
|
||||
return update(new UpdateRequest()
|
||||
.index(index)
|
||||
.type(TYPE_NAME)
|
||||
.id(id)
|
||||
.doc(source.hasArray() ? source.array() : source.toBytes()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.get.GetAction;
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
|
@ -15,9 +16,12 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||
import org.elasticsearch.action.search.SearchScrollAction;
|
||||
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.xbib.elx.api.SearchClient;
|
||||
import org.xbib.elx.api.SearchMetric;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Optional;
|
||||
|
@ -30,11 +34,43 @@ import java.util.stream.StreamSupport;
|
|||
|
||||
public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient {
|
||||
|
||||
private SearchMetric searchMetric;
|
||||
|
||||
@Override
|
||||
public SearchMetric getSearchMetric() {
|
||||
return searchMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) throws IOException {
|
||||
super.init(settings);
|
||||
this.searchMetric = new DefaultSearchMetric();
|
||||
searchMetric.init(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) {
|
||||
GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE);
|
||||
getRequestBuilderConsumer.accept(getRequestBuilder);
|
||||
GetResponse getResponse = getRequestBuilder.execute().actionGet();
|
||||
ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
GetResponse getResponse = actionFuture.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
if (getResponse.isExists()) {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
} else {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
}
|
||||
return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty();
|
||||
}
|
||||
|
||||
|
@ -42,23 +78,46 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
public Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilderConsumer) {
|
||||
MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
|
||||
multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder);
|
||||
MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.execute().actionGet();
|
||||
return multiGetItemResponse.getResponses().length == 0 ? Optional.empty() : Optional.of(multiGetItemResponse);
|
||||
ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
MultiGetResponse multiGetItemResponse = actionFuture.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
boolean isempty = multiGetItemResponse.getResponses().length == 0;
|
||||
if (isempty) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
} else {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
return isempty ? Optional.empty() : Optional.of(multiGetItemResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<SearchResponse> search(Consumer<SearchRequestBuilder> queryBuilder) {
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
|
||||
queryBuilder.accept(searchRequestBuilder);
|
||||
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
|
||||
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
SearchResponse searchResponse = actionFuture.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
if (searchResponse.getFailedShards() > 0) {
|
||||
StringBuilder sb = new StringBuilder("Search failed:");
|
||||
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
|
||||
sb.append("\n").append(failure.reason());
|
||||
}
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
throw new ElasticsearchException(sb.toString());
|
||||
}
|
||||
return searchResponse.getHits().getHits().length == 0 ? Optional.empty() : Optional.of(searchResponse);
|
||||
boolean isempty = searchResponse.getHits().getHits().length == 0;
|
||||
if (isempty) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
} else {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
return isempty ? Optional.empty() : Optional.of(searchResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,10 +128,25 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
|
||||
SearchResponse originalSearchResponse = searchRequestBuilder.execute().actionGet();
|
||||
Stream<SearchResponse> infiniteResponses = Stream.iterate(originalSearchResponse,
|
||||
searchResponse -> new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
|
||||
.setScrollId(searchResponse.getScrollId())
|
||||
.setScroll(scrollTime)
|
||||
.execute().actionGet());
|
||||
searchResponse -> {
|
||||
SearchScrollRequestBuilder searchScrollRequestBuilder =
|
||||
new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
|
||||
.setScrollId(searchResponse.getScrollId())
|
||||
.setScroll(scrollTime);
|
||||
ActionFuture<SearchResponse> actionFuture1 = searchScrollRequestBuilder.execute();
|
||||
searchMetric.getCurrentQueries().inc();
|
||||
SearchResponse searchResponse1 = actionFuture1.actionGet();
|
||||
searchMetric.getCurrentQueries().dec();
|
||||
searchMetric.getQueries().inc();
|
||||
searchMetric.markTotalQueries(1);
|
||||
boolean isempty = searchResponse1.getHits().getHits().length == 0;
|
||||
if (isempty) {
|
||||
searchMetric.getEmptyQueries().inc();
|
||||
} else {
|
||||
searchMetric.getSucceededQueries().inc();
|
||||
}
|
||||
return searchResponse1;
|
||||
});
|
||||
Predicate<SearchResponse> condition = searchResponse -> searchResponse.getHits().getHits().length > 0;
|
||||
Consumer<SearchResponse> lastAction = searchResponse -> {
|
||||
ClearScrollRequestBuilder clearScrollRequestBuilder =
|
||||
|
|
|
@ -27,10 +27,12 @@ public class DefaultBulkController implements BulkController {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(DefaultBulkController.class);
|
||||
|
||||
private final BulkClient client;
|
||||
private final BulkClient bulkClient;
|
||||
|
||||
private final BulkMetric bulkMetric;
|
||||
|
||||
private BulkProcessor bulkProcessor;
|
||||
|
||||
private final List<String> indexNames;
|
||||
|
||||
private final Map<String, Long> startBulkRefreshIntervals;
|
||||
|
@ -41,15 +43,11 @@ public class DefaultBulkController implements BulkController {
|
|||
|
||||
private final TimeUnit maxWaitTimeUnit;
|
||||
|
||||
private BulkProcessor bulkProcessor;
|
||||
|
||||
private BulkListener bulkListener;
|
||||
|
||||
private final AtomicBoolean active;
|
||||
|
||||
public DefaultBulkController(BulkClient client, BulkMetric bulkMetric) {
|
||||
this.client = client;
|
||||
this.bulkMetric = bulkMetric;
|
||||
public DefaultBulkController(BulkClient bulkClient) {
|
||||
this.bulkClient = bulkClient;
|
||||
this.bulkMetric = new DefaultBulkMetric();
|
||||
this.indexNames = new ArrayList<>();
|
||||
this.active = new AtomicBoolean(false);
|
||||
this.startBulkRefreshIntervals = new HashMap<>();
|
||||
|
@ -65,11 +63,12 @@ public class DefaultBulkController implements BulkController {
|
|||
|
||||
@Override
|
||||
public Throwable getLastBulkError() {
|
||||
return bulkListener.getLastBulkError();
|
||||
return bulkProcessor.getBulkListener().getLastBulkError();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) {
|
||||
bulkMetric.init(settings);
|
||||
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(),
|
||||
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum());
|
||||
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(),
|
||||
|
@ -81,8 +80,8 @@ public class DefaultBulkController implements BulkController {
|
|||
"maxVolumePerRequest"));
|
||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
|
||||
Parameters.ENABLE_BULK_LOGGING.getValue());
|
||||
this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
|
||||
this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener)
|
||||
BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
|
||||
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
|
||||
.setBulkActions(maxActionsPerRequest)
|
||||
.setConcurrentRequests(maxConcurrentRequests)
|
||||
.setFlushInterval(flushIngestInterval)
|
||||
|
@ -117,7 +116,7 @@ public class DefaultBulkController implements BulkController {
|
|||
startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds);
|
||||
stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds);
|
||||
if (startRefreshIntervalInSeconds != 0L) {
|
||||
client.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s",
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s",
|
||||
30L, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
@ -189,7 +188,7 @@ public class DefaultBulkController implements BulkController {
|
|||
if (indexNames.contains(index)) {
|
||||
Long secs = stopBulkRefreshIntervals.get(index);
|
||||
if (secs != null && secs != 0L) {
|
||||
client.updateIndexSetting(index, "refresh_interval", secs + "s",
|
||||
bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s",
|
||||
30L, TimeUnit.SECONDS);
|
||||
}
|
||||
indexNames.remove(index);
|
||||
|
@ -207,11 +206,11 @@ public class DefaultBulkController implements BulkController {
|
|||
@Override
|
||||
public void close() throws IOException {
|
||||
flush();
|
||||
if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
|
||||
if (bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
|
||||
for (String index : indexNames) {
|
||||
Long secs = stopBulkRefreshIntervals.get(index);
|
||||
if (secs != null && secs != 0L)
|
||||
client.updateIndexSetting(index, "refresh_interval", secs + "s",
|
||||
bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s",
|
||||
30L, TimeUnit.SECONDS);
|
||||
}
|
||||
indexNames.clear();
|
||||
|
@ -228,9 +227,5 @@ public class DefaultBulkController implements BulkController {
|
|||
if (bulkProcessor == null) {
|
||||
throw new UnsupportedOperationException("bulk processor not present");
|
||||
}
|
||||
if (bulkListener == null) {
|
||||
throw new UnsupportedOperationException("bulk listener not present");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import org.xbib.elx.api.BulkMetric;
|
|||
|
||||
public class DefaultBulkListener implements BulkListener {
|
||||
|
||||
private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
|
||||
private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName());
|
||||
|
||||
private final BulkController bulkController;
|
||||
|
||||
|
@ -31,15 +31,12 @@ public class DefaultBulkListener implements BulkListener {
|
|||
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {
|
||||
long l = 0;
|
||||
if (bulkMetric != null) {
|
||||
l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().inc();
|
||||
int n = request.numberOfActions();
|
||||
bulkMetric.getSubmitted().inc(n);
|
||||
bulkMetric.getCurrentIngestNumDocs().inc(n);
|
||||
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
|
||||
}
|
||||
long l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().inc();
|
||||
int n = request.numberOfActions();
|
||||
bulkMetric.getSubmitted().inc(n);
|
||||
bulkMetric.getCurrentIngestNumDocs().inc(n);
|
||||
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
|
||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
|
||||
executionId,
|
||||
|
@ -51,26 +48,19 @@ public class DefaultBulkListener implements BulkListener {
|
|||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
long l = 0;
|
||||
if (bulkMetric != null) {
|
||||
l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
bulkMetric.getSucceeded().inc(response.getItems().length);
|
||||
}
|
||||
long l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
bulkMetric.getSucceeded().inc(response.getItems().length);
|
||||
int n = 0;
|
||||
for (BulkItemResponse itemResponse : response.getItems()) {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
|
||||
}
|
||||
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
|
||||
if (itemResponse.isFailed()) {
|
||||
n++;
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getSucceeded().dec(1);
|
||||
bulkMetric.getFailed().inc(1);
|
||||
}
|
||||
bulkMetric.getSucceeded().dec(1);
|
||||
bulkMetric.getFailed().inc(1);
|
||||
}
|
||||
}
|
||||
if (isBulkLoggingEnabled && bulkMetric != null && logger.isDebugEnabled()) {
|
||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
|
||||
executionId,
|
||||
bulkMetric.getSucceeded().getCount(),
|
||||
|
@ -84,17 +74,13 @@ public class DefaultBulkListener implements BulkListener {
|
|||
executionId, n, response.buildFailureMessage());
|
||||
}
|
||||
} else {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
|
||||
}
|
||||
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
}
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
lastBulkError = failure;
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("after bulk [" + executionId + "] error", failure);
|
||||
|
|
|
@ -44,6 +44,12 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
start();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void markTotalIngest(long n) {
|
||||
totalIngest.mark(n);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metered getTotalIngest() {
|
||||
return totalIngest;
|
||||
|
|
|
@ -24,8 +24,10 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
|
||||
* (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk
|
||||
* A bulk processor is a thread safe bulk processing class, allowing to easily
|
||||
* set when to "flush" a new bulk request
|
||||
* (either based on number of actions, based on the size, or time), and
|
||||
* to easily control the number of concurrent bulk
|
||||
* requests allowed to be executed in parallel.
|
||||
* In order to create a new bulk processor, use the {@link Builder}.
|
||||
*/
|
||||
|
@ -78,11 +80,9 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
public static Builder builder(ElasticsearchClient client,
|
||||
BulkListener listener) {
|
||||
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
|
||||
Objects.requireNonNull(listener, "A listener for the BulkProcessor is required but null");
|
||||
return new Builder(client, listener);
|
||||
public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) {
|
||||
Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null");
|
||||
return new Builder(client, bulkListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -149,9 +149,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
* @param request request
|
||||
* @return his bulk processor
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
public DefaultBulkProcessor add(ActionRequest request) {
|
||||
public DefaultBulkProcessor add(ActionRequest<?> request) {
|
||||
internalAdd(request);
|
||||
return this;
|
||||
}
|
||||
|
@ -335,24 +334,25 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final BulkListener listener;
|
||||
private final BulkListener bulkListener;
|
||||
|
||||
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener) {
|
||||
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) {
|
||||
Objects.requireNonNull(bulkListener, "A listener is required for SyncBulkRequestHandler but null");
|
||||
this.client = client;
|
||||
this.listener = listener;
|
||||
this.bulkListener = bulkListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(BulkRequest bulkRequest, long executionId) {
|
||||
boolean afterCalled = false;
|
||||
try {
|
||||
listener.beforeBulk(executionId, bulkRequest);
|
||||
bulkListener.beforeBulk(executionId, bulkRequest);
|
||||
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
|
||||
afterCalled = true;
|
||||
listener.afterBulk(executionId, bulkRequest, bulkResponse);
|
||||
bulkListener.afterBulk(executionId, bulkRequest, bulkResponse);
|
||||
} catch (Exception e) {
|
||||
if (!afterCalled) {
|
||||
listener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -367,15 +367,16 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final BulkListener listener;
|
||||
private final BulkListener bulkListener;
|
||||
|
||||
private final Semaphore semaphore;
|
||||
|
||||
private final int concurrentRequests;
|
||||
|
||||
private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener listener, int concurrentRequests) {
|
||||
private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) {
|
||||
Objects.requireNonNull(bulkListener, "A listener is required for AsyncBulkRequestHandler but null");
|
||||
this.client = client;
|
||||
this.listener = listener;
|
||||
this.bulkListener = bulkListener;
|
||||
this.concurrentRequests = concurrentRequests;
|
||||
this.semaphore = new Semaphore(concurrentRequests);
|
||||
}
|
||||
|
@ -385,14 +386,14 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
boolean bulkRequestSetupSuccessful = false;
|
||||
boolean acquired = false;
|
||||
try {
|
||||
listener.beforeBulk(executionId, bulkRequest);
|
||||
bulkListener.beforeBulk(executionId, bulkRequest);
|
||||
semaphore.acquire();
|
||||
acquired = true;
|
||||
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
try {
|
||||
listener.afterBulk(executionId, bulkRequest, response);
|
||||
bulkListener.afterBulk(executionId, bulkRequest, response);
|
||||
} finally {
|
||||
semaphore.release();
|
||||
}
|
||||
|
@ -401,7 +402,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
listener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
} finally {
|
||||
semaphore.release();
|
||||
}
|
||||
|
@ -410,9 +411,9 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
bulkRequestSetupSuccessful = true;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
listener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
} catch (Exception e) {
|
||||
listener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
} finally {
|
||||
if (!bulkRequestSetupSuccessful && acquired) {
|
||||
// if we fail on client.bulk() release the semaphore
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.elx.api.SearchMetric;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
import org.xbib.metrics.common.CountMetric;
|
||||
import org.xbib.metrics.common.Meter;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class DefaultSearchMetric implements SearchMetric {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName());
|
||||
|
||||
private final Meter totalQuery;
|
||||
|
||||
private final Count currentQuery;
|
||||
|
||||
private final Count queries;
|
||||
|
||||
private final Count succeededQueries;
|
||||
|
||||
private final Count emptyQueries;
|
||||
|
||||
private Long started;
|
||||
|
||||
private Long stopped;
|
||||
|
||||
public DefaultSearchMetric() {
|
||||
totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor());
|
||||
currentQuery = new CountMetric();
|
||||
queries = new CountMetric();
|
||||
succeededQueries = new CountMetric();
|
||||
emptyQueries = new CountMetric();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) {
|
||||
logger.info("init");
|
||||
start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markTotalQueries(long n) {
|
||||
totalQuery.mark(n);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metered getTotalQueries() {
|
||||
return totalQuery;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Count getCurrentQueries() {
|
||||
return currentQuery;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Count getQueries() {
|
||||
return queries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Count getSucceededQueries() {
|
||||
return succeededQueries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Count getEmptyQueries() {
|
||||
return emptyQueries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long elapsed() {
|
||||
return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
this.started = System.nanoTime();
|
||||
totalQuery.start(5L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
this.stopped = System.nanoTime();
|
||||
totalQuery.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
stop();
|
||||
totalQuery.shutdown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
org.xbib.elx.common.MockBulkClientProvider
|
|
@ -0,0 +1 @@
|
|||
org.xbib.elx.common.MockSearchClientProvider
|
|
@ -1,21 +0,0 @@
|
|||
package org.xbib.elx.common.test;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.MockAdminClient;
|
||||
import org.xbib.elx.common.MockAdminClientProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
||||
class MockAdminClientProviderTest {
|
||||
|
||||
@Test
|
||||
void testMockAdminProvider() throws IOException {
|
||||
MockAdminClient client = ClientBuilder.builder()
|
||||
.setAdminClientProvider(MockAdminClientProvider.class)
|
||||
.build();
|
||||
assertNotNull(client);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package org.xbib.elx.common.test;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.MockAdminClient;
|
||||
import org.xbib.elx.common.MockAdminClientProvider;
|
||||
import org.xbib.elx.common.MockBulkClient;
|
||||
import org.xbib.elx.common.MockBulkClientProvider;
|
||||
import org.xbib.elx.common.MockSearchClient;
|
||||
import org.xbib.elx.common.MockSearchClientProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
||||
class MockClientProviderTest {
|
||||
|
||||
@Test
|
||||
void testMockAdminClientProvider() throws IOException {
|
||||
MockAdminClient client = ClientBuilder.builder()
|
||||
.setAdminClientProvider(MockAdminClientProvider.class)
|
||||
.build();
|
||||
assertNotNull(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMockBulkClientProvider() throws IOException {
|
||||
MockBulkClient client = ClientBuilder.builder()
|
||||
.setBulkClientProvider(MockBulkClientProvider.class)
|
||||
.build();
|
||||
assertNotNull(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMockSearchClientProvider() throws IOException {
|
||||
MockSearchClient client = ClientBuilder.builder()
|
||||
.setSearchClientProvider(MockSearchClientProvider.class)
|
||||
.build();
|
||||
assertNotNull(client);
|
||||
}
|
||||
}
|
|
@ -6,13 +6,12 @@ import org.elasticsearch.action.bulk.BulkAction;
|
|||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
@ -54,16 +53,14 @@ class SearchTest {
|
|||
client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet();
|
||||
for (int i = 0; i < 1; i++) {
|
||||
QueryBuilder queryStringBuilder = QueryBuilders.queryStringQuery("rs:" + 1234);
|
||||
SearchSourceBuilder searchSource = new SearchSourceBuilder();
|
||||
searchSource.query(queryStringBuilder);
|
||||
searchSource.sort("rowcount", SortOrder.DESC);
|
||||
searchSource.from(i * 10);
|
||||
searchSource.size(10);
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices("pages");
|
||||
searchRequest.types("row");
|
||||
searchRequest.source(searchSource);
|
||||
SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
||||
.setIndices("pages")
|
||||
.setTypes("row")
|
||||
.setQuery(queryStringBuilder)
|
||||
.addSort("rowcount", SortOrder.DESC)
|
||||
.setFrom(i * 10)
|
||||
.setSize(10);
|
||||
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
|
||||
assertTrue(searchResponse.getHits().getTotalHits() > 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package org.xbib.elx.common.test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
|
||||
|
@ -19,6 +17,7 @@ import org.elasticsearch.index.query.QueryBuilders;
|
|||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
class SimpleTest {
|
||||
|
|
|
@ -46,7 +46,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final String key = "es-instance";
|
||||
private static final String key = "es-instance-";
|
||||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
|
@ -73,17 +73,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
||||
helper.startNode("1");
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
Object obj = response.iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
String host = null;
|
||||
int port = 0;
|
||||
if (obj instanceof InetSocketTransportAddress) {
|
||||
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
}
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
|
@ -99,7 +88,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
ClusterStateResponse clusterStateResponse =
|
||||
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -107,12 +95,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
closeNodes(helper);
|
||||
deleteFiles(Paths.get(helper.getHome() + "/data"));
|
||||
logger.info("data files wiped");
|
||||
deleteFiles(Paths.get(helper.getHome()));
|
||||
logger.info("files wiped");
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private void closeNodes(Helper helper) throws IOException {
|
||||
private void closeNodes(Helper helper) {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : helper.clients.values()) {
|
||||
client.close();
|
||||
|
@ -128,7 +116,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
private static void deleteFiles(Path directory) throws IOException {
|
||||
if (Files.exists(directory)) {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
|
@ -146,7 +134,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
private Helper create() {
|
||||
Helper helper = new Helper();
|
||||
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
|
||||
String home = System.getProperty("path.home", "build/elxnode");
|
||||
helper.setHome(home + "/" + helper.randomString(8));
|
||||
helper.setClusterName("test-cluster-" + helper.randomString(8));
|
||||
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
|
||||
return helper;
|
||||
|
@ -158,6 +147,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
String cluster;
|
||||
|
||||
String host;
|
||||
|
||||
int port;
|
||||
|
||||
Map<String, Node> nodes = new HashMap<>();
|
||||
|
||||
Map<String, AbstractClient> clients = new HashMap<>();
|
||||
|
@ -187,6 +180,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
void startNode(String id) {
|
||||
buildNode(id).start();
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = client(id). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
Object obj = response.iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
if (obj instanceof InetSocketTransportAddress) {
|
||||
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
}
|
||||
}
|
||||
|
||||
ElasticsearchClient client(String id) {
|
||||
|
|
|
@ -5,12 +5,11 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
|||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
|
@ -52,13 +51,11 @@ class WildcardTest {
|
|||
}
|
||||
|
||||
private long count(ElasticsearchClient client, QueryBuilder queryBuilder) {
|
||||
SearchSourceBuilder builder = new SearchSourceBuilder()
|
||||
.query(queryBuilder);
|
||||
SearchRequest searchRequest = new SearchRequest()
|
||||
.indices("index")
|
||||
.types("type")
|
||||
.source(builder);
|
||||
return client.execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
||||
.setIndices("index")
|
||||
.setTypes("type")
|
||||
.setQuery(queryBuilder);
|
||||
return searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
|
||||
}
|
||||
|
||||
private void validateCount(ElasticsearchClient client, QueryBuilder queryBuilder, long expectedHits) {
|
||||
|
|
|
@ -1 +1,4 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package org.xbib.elx.common.test;
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.xbib.elx.node;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.elx.common.AbstractAdminClient;
|
||||
import java.io.IOException;
|
||||
|
||||
public class NodeAdminClient extends AbstractAdminClient {
|
||||
|
||||
|
@ -13,12 +14,12 @@ public class NodeAdminClient extends AbstractAdminClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
protected ElasticsearchClient createClient(Settings settings) {
|
||||
return helper.createClient(settings, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeClient(Settings settings) {
|
||||
public void closeClient(Settings settings) throws IOException {
|
||||
helper.closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.xbib.elx.node;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.elx.common.AbstractBulkClient;
|
||||
import java.io.IOException;
|
||||
|
||||
public class NodeBulkClient extends AbstractBulkClient {
|
||||
|
||||
|
@ -13,12 +14,12 @@ public class NodeBulkClient extends AbstractBulkClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
protected ElasticsearchClient createClient(Settings settings) {
|
||||
return helper.createClient(settings, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeClient(Settings settings) {
|
||||
public void closeClient(Settings settings) throws IOException {
|
||||
helper.closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,8 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -34,7 +36,7 @@ public class NodeClientHelper {
|
|||
key -> innerCreateClient(settings));
|
||||
}
|
||||
|
||||
public void closeClient(Settings settings) {
|
||||
public void closeClient(Settings settings) throws IOException {
|
||||
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name"));
|
||||
if (client != null) {
|
||||
logger.debug("closing node...");
|
||||
|
@ -49,20 +51,38 @@ public class NodeClientHelper {
|
|||
+ " " + System.getProperty("java.vm.vendor")
|
||||
+ " " + System.getProperty("java.runtime.version")
|
||||
+ " " + System.getProperty("java.vm.version");
|
||||
Settings effectiveSettings = Settings.builder().put(settings)
|
||||
Settings effectiveSettings = Settings.builder()
|
||||
.put(filterSettings(settings))
|
||||
.put("node.client", true)
|
||||
.put("node.master", false)
|
||||
.put("node.data", false)
|
||||
.put("path.home", settings.get("path.home"))
|
||||
.build();
|
||||
logger.info("creating node client on {} with effective settings {}",
|
||||
version, effectiveSettings.getAsMap());
|
||||
version, effectiveSettings.toDelimitedString(','));
|
||||
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
|
||||
node = new BulkNode(new Environment(effectiveSettings), plugins);
|
||||
node.start();
|
||||
return node.client();
|
||||
}
|
||||
|
||||
private static Settings filterSettings(Settings settings) {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
for (Map.Entry<String, String> entry : settings.getAsMap().entrySet()) {
|
||||
if (!isPrivateSettings(entry.getKey())) {
|
||||
builder.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static boolean isPrivateSettings(String key) {
|
||||
return key.equals(Parameters.MAX_ACTIONS_PER_REQUEST.name()) ||
|
||||
key.equals(Parameters.MAX_CONCURRENT_REQUESTS.name()) ||
|
||||
key.equals(Parameters.MAX_VOLUME_PER_REQUEST.name()) ||
|
||||
key.equals(Parameters.FLUSH_INTERVAL.name());
|
||||
}
|
||||
|
||||
private static class BulkNode extends Node {
|
||||
|
||||
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.xbib.elx.node;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.elx.common.AbstractSearchClient;
|
||||
import java.io.IOException;
|
||||
|
||||
public class NodeSearchClient extends AbstractSearchClient {
|
||||
|
||||
|
@ -13,12 +14,12 @@ public class NodeSearchClient extends AbstractSearchClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
protected ElasticsearchClient createClient(Settings settings) {
|
||||
return helper.createClient(settings, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeClient(Settings settings) {
|
||||
public void closeClient(Settings settings) throws IOException {
|
||||
helper.closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,45 +41,42 @@ class BulkClientTest {
|
|||
|
||||
@Test
|
||||
void testSingleDoc() throws Exception {
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30))
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} finally {
|
||||
assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
bulkClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testNewIndex() throws Exception {
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
|
||||
.build();
|
||||
bulkClient.newIndex("test");
|
||||
bulkClient.close();
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMapping() throws Exception {
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client)
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build();
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build()) {
|
||||
|
@ -101,13 +98,12 @@ class BulkClientTest {
|
|||
@Test
|
||||
void testRandomDocs() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
|
@ -115,15 +111,13 @@ class BulkClientTest {
|
|||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} finally {
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
bulkClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,14 +127,13 @@ class BulkClientTest {
|
|||
Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
||||
final long actions = ACTIONS;
|
||||
logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions);
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest)
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
|
@ -171,15 +164,13 @@ class BulkClientTest {
|
|||
logger.warn("latch timeout");
|
||||
}
|
||||
bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS);
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
} finally {
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test"));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test"));
|
||||
bulkClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ class DuplicateIDTest {
|
|||
@Test
|
||||
void testDuplicateDocIDs() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
|
@ -47,7 +47,7 @@ class DuplicateIDTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -33,11 +33,11 @@ class IndexPruneTest {
|
|||
|
||||
@Test
|
||||
void testPrune() throws IOException {
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client)
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build();
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build()) {
|
||||
|
|
|
@ -33,11 +33,11 @@ class IndexShiftTest {
|
|||
|
||||
@Test
|
||||
void testIndexShift() throws Exception {
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client)
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build();
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build()) {
|
||||
|
|
|
@ -36,13 +36,11 @@ class SearchTest {
|
|||
|
||||
@Test
|
||||
void testDocStream() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try (bulkClient) {
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
|
@ -51,14 +49,14 @@ class SearchTest {
|
|||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
assertEquals(ACTIONS, bulkClient.getSearchableDocs("test"));
|
||||
assertEquals(ACTIONS, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client)
|
||||
.setSearchClientProvider(NodeSearchClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build()) {
|
||||
|
@ -67,7 +65,7 @@ class SearchTest {
|
|||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMinutes(1), 10);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
assertEquals(ACTIONS, count);
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
|
@ -76,7 +74,10 @@ class SearchTest {
|
|||
logger.info(id);
|
||||
idcount.incrementAndGet();
|
||||
});
|
||||
assertEquals(numactions, idcount.get());
|
||||
assertEquals(ACTIONS, idcount.get());
|
||||
assertEquals(11, searchClient.getSearchMetric().getQueries().getCount());
|
||||
assertEquals(9, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,23 +30,28 @@ class SmokeTest {
|
|||
|
||||
@Test
|
||||
void smokeTest() throws Exception {
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client)
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build();
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke_definition", Settings.EMPTY);
|
||||
assertEquals(0, indexDefinition.getReplicaLevel());
|
||||
assertEquals(helper.getClusterName(), adminClient.getClusterName());
|
||||
bulkClient.newIndex("test_smoke");
|
||||
logger.info("new index: done");
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
logger.info("index doc: done");
|
||||
bulkClient.flush();
|
||||
logger.info("flush: done");
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
logger.info("wait: done");
|
||||
adminClient.checkMapping("test_smoke");
|
||||
logger.info("check mapping: done");
|
||||
bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
|
||||
bulkClient.delete("test_smoke", "1");
|
||||
bulkClient.flush();
|
||||
|
@ -56,6 +61,7 @@ class SmokeTest {
|
|||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
adminClient.deleteIndex("test_smoke");
|
||||
logger.info("delete index: done");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
bulkClient.flush();
|
||||
|
@ -63,13 +69,14 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
logger.info("done");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
|||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -33,8 +32,7 @@ import java.nio.file.Path;
|
|||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -42,11 +40,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final String key = "es-instance";
|
||||
private static final String key = "es-instance-";
|
||||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
|
@ -62,73 +56,35 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
@Override
|
||||
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
// initialize new helper here, increase counter
|
||||
return extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
|
||||
return extensionContext.getParent().isPresent() ?
|
||||
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
Helper helper = extensionContext.getParent().isPresent() ?
|
||||
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
|
||||
Objects.requireNonNull(helper);
|
||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
||||
helper.startNode("1");
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
Object obj = response.iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
String host = null;
|
||||
int port = 0;
|
||||
if (obj instanceof InetSocketTransportAddress) {
|
||||
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
}
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
helper.startNode();
|
||||
helper.greenHealth();
|
||||
logger.info("cluser name = {}", helper.clusterName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
closeNodes(helper);
|
||||
deleteFiles(Paths.get(helper.getHome() + "/data"));
|
||||
logger.info("data files wiped");
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private void closeNodes(Helper helper) throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : helper.clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : helper.nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
logger.info("all nodes closed");
|
||||
Helper helper = extensionContext.getParent().isPresent() ?
|
||||
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
|
||||
Objects.requireNonNull(helper);
|
||||
helper.closeNodes();
|
||||
deleteFiles(Paths.get(helper.getHome()));
|
||||
logger.info("files wiped");
|
||||
Thread.sleep(1000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private static void deleteFiles(Path directory) throws IOException {
|
||||
if (Files.exists(directory)) {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
|
@ -146,7 +102,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
private Helper create() {
|
||||
Helper helper = new Helper();
|
||||
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
|
||||
String home = System.getProperty("path.home", "build/elxnode");
|
||||
helper.setHome(home + "/" + helper.randomString(8));
|
||||
helper.setClusterName("test-cluster-" + helper.randomString(8));
|
||||
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
|
||||
return helper;
|
||||
|
@ -158,9 +115,13 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
String cluster;
|
||||
|
||||
Map<String, Node> nodes = new HashMap<>();
|
||||
String host;
|
||||
|
||||
Map<String, AbstractClient> clients = new HashMap<>();
|
||||
int port;
|
||||
|
||||
Node node;
|
||||
|
||||
AbstractClient client;
|
||||
|
||||
void setHome(String home) {
|
||||
this.home = home;
|
||||
|
@ -180,18 +141,23 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
Settings getNodeSettings() {
|
||||
return Settings.builder()
|
||||
.put("name", "elx-client") // for Threadpool name
|
||||
.put("name", "elx-client") // for threadpool name
|
||||
.put("cluster.name", getClusterName())
|
||||
.put("path.home", getHome())
|
||||
.build();
|
||||
}
|
||||
|
||||
void startNode(String id) {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
ElasticsearchClient client(String id) {
|
||||
return clients.get(id);
|
||||
void startNode() {
|
||||
buildNode().start();
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
Object obj = response.iterator().next().getTransport().getAddress().publishAddress();
|
||||
if (obj instanceof InetSocketTransportAddress) {
|
||||
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
}
|
||||
}
|
||||
|
||||
String randomString(int len) {
|
||||
|
@ -203,16 +169,51 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
return new String(buf);
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Node buildNode() {
|
||||
String id = "1";
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
Node node = new MockNode(nodeSettings);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
node = new MockNode(nodeSettings);
|
||||
client = (AbstractClient) node.client();
|
||||
return node;
|
||||
}
|
||||
|
||||
void closeNodes() {
|
||||
if (client != null) {
|
||||
logger.info("closing client");
|
||||
client.close();
|
||||
}
|
||||
if (node != null) {
|
||||
logger.info("closing all nodes");
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
|
||||
void greenHealth() throws IOException {
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
}
|
||||
|
||||
String clusterName() {
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
return clusterStateResponse.getClusterName().value();
|
||||
}
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package org.xbib.elx.node.test;
|
|
@ -10,4 +10,4 @@
|
|||
<AppenderRef ref="Console" />
|
||||
</Root>
|
||||
</Loggers>
|
||||
</configuration>
|
||||
</configuration>
|
||||
|
|
|
@ -19,8 +19,8 @@ public class TransportAdminClient extends AbstractAdminClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
return helper.createClient(settings, null);
|
||||
public ElasticsearchClient createClient(Settings settings) throws IOException {
|
||||
return helper.createClient(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -30,7 +30,7 @@ public class TransportAdminClient extends AbstractAdminClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void closeClient(Settings settings) {
|
||||
public void closeClient(Settings settings) throws IOException {
|
||||
helper.closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ public class TransportBulkClient extends AbstractBulkClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
return helper.createClient(settings, null);
|
||||
public ElasticsearchClient createClient(Settings settings) throws IOException {
|
||||
return helper.createClient(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -29,7 +29,7 @@ public class TransportBulkClient extends AbstractBulkClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void closeClient(Settings settings) {
|
||||
public void closeClient(Settings settings) throws IOException {
|
||||
helper.closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,23 +29,16 @@ public class TransportClientHelper {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(TransportClientHelper.class.getName());
|
||||
|
||||
private static Object configurationObject;
|
||||
|
||||
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>();
|
||||
|
||||
public ElasticsearchClient createClient(Settings settings, Object object) {
|
||||
if (configurationObject == null && object != null) {
|
||||
configurationObject = object;
|
||||
}
|
||||
if (configurationObject instanceof ElasticsearchClient) {
|
||||
return (ElasticsearchClient) configurationObject;
|
||||
}
|
||||
return clientMap.computeIfAbsent(settings.get("cluster.name"),
|
||||
key -> innerCreateClient(settings));
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
|
||||
}
|
||||
|
||||
public void closeClient(Settings settings) {
|
||||
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name"));
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
ElasticsearchClient client = clientMap.remove(clusterName);
|
||||
if (client != null) {
|
||||
if (client instanceof Client) {
|
||||
((Client) client).close();
|
||||
|
@ -57,8 +50,8 @@ public class TransportClientHelper {
|
|||
public void init(TransportClient transportClient, Settings settings) throws IOException {
|
||||
Collection<TransportAddress> addrs = findAddresses(settings);
|
||||
if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) {
|
||||
throw new NoNodeAvailableException("no cluster nodes available, check settings "
|
||||
+ settings.getAsMap());
|
||||
throw new NoNodeAvailableException("no cluster nodes available, check settings = "
|
||||
+ settings.toDelimitedString(','));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -77,12 +70,13 @@ public class TransportClientHelper {
|
|||
} catch (NumberFormatException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
if (splitHost.length == 1) {
|
||||
} else if (splitHost.length == 1) {
|
||||
String host = splitHost[0];
|
||||
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
|
||||
TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort);
|
||||
addresses.add(address);
|
||||
} else {
|
||||
throw new IOException("invalid hostname specification: " + hostname);
|
||||
}
|
||||
}
|
||||
return addresses;
|
||||
|
@ -96,47 +90,45 @@ public class TransportClientHelper {
|
|||
logger.info("connected to nodes = {}", nodes);
|
||||
if (nodes != null && !nodes.isEmpty()) {
|
||||
if (autodiscover) {
|
||||
logger.debug("trying to auto-discover all nodes...");
|
||||
logger.debug("trying to discover all nodes...");
|
||||
ClusterStateRequestBuilder clusterStateRequestBuilder =
|
||||
new ClusterStateRequestBuilder(transportClient, ClusterStateAction.INSTANCE);
|
||||
ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
|
||||
DiscoveryNodes discoveryNodes = clusterStateResponse.getState().getNodes();
|
||||
addDiscoveryNodes(transportClient, discoveryNodes);
|
||||
logger.info("after auto-discovery: connected to {}", transportClient.connectedNodes());
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
transportClient.addTransportAddress(discoveryNode.getAddress());
|
||||
}
|
||||
logger.info("after discovery: connected to {}", transportClient.connectedNodes());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void addDiscoveryNodes(TransportClient transportClient, DiscoveryNodes discoveryNodes) {
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
transportClient.addTransportAddress(discoveryNode.getAddress());
|
||||
}
|
||||
}
|
||||
|
||||
private ElasticsearchClient innerCreateClient(Settings settings) {
|
||||
String systemIdentifier = System.getProperty("os.name")
|
||||
+ " " + System.getProperty("java.vm.name")
|
||||
+ " " + System.getProperty("java.vm.vendor")
|
||||
+ " " + System.getProperty("java.vm.version")
|
||||
+ " Elasticsearch " + Version.CURRENT.toString();
|
||||
Settings effectiveSettings = Settings.builder()
|
||||
// for thread pool size
|
||||
.put("processors",
|
||||
settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
|
||||
logger.info("creating transport client on {} with custom settings {}",
|
||||
systemIdentifier, settings.getAsMap());
|
||||
// we need to disable dead lock check because we may have mixed node/transport clients
|
||||
DefaultChannelFuture.setUseDeadLockChecker(false);
|
||||
return TransportClient.builder()
|
||||
.settings(getTransportClientSettings(settings))
|
||||
.build();
|
||||
}
|
||||
|
||||
private Settings getTransportClientSettings(Settings settings) {
|
||||
return Settings.builder()
|
||||
.put("cluster.name", settings.get("cluster.name", "elasticsearch"))
|
||||
.put("path.home", settings.get("path.home", "."))
|
||||
.put("processors", settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) // for thread pool size
|
||||
.put("client.transport.sniff", false) // do not sniff
|
||||
.put("client.transport.nodes_sampler_interval", "1m") // do not ping
|
||||
.put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect
|
||||
.put("client.transport.ignore_cluster_name", true) // connect to any cluster
|
||||
// custom settings may override defaults
|
||||
.put(settings)
|
||||
.build();
|
||||
logger.info("creating transport client on {} with custom settings {} and effective settings {}",
|
||||
systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap());
|
||||
|
||||
// we need to disable dead lock check because we may have mixed node/transport clients
|
||||
DefaultChannelFuture.setUseDeadLockChecker(false);
|
||||
return TransportClient.builder().settings(effectiveSettings).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ public class TransportSearchClient extends AbstractSearchClient {
|
|||
|
||||
@Override
|
||||
public ElasticsearchClient createClient(Settings settings) throws IOException {
|
||||
return helper.createClient(settings, null);
|
||||
return helper.createClient(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -29,7 +29,7 @@ public class TransportSearchClient extends AbstractSearchClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void closeClient(Settings settings) {
|
||||
public void closeClient(Settings settings) throws IOException {
|
||||
helper.closeClient(settings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package org.xbib.elx.transport.test;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -42,35 +41,32 @@ class BulkClientTest {
|
|||
|
||||
@Test
|
||||
void testSingleDoc() throws Exception {
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30))
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} finally {
|
||||
assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
bulkClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testNewIndex() throws Exception {
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
bulkClient.newIndex("test");
|
||||
bulkClient.close();
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -80,9 +76,9 @@ class BulkClientTest {
|
|||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.startObject("doc")
|
||||
|
@ -101,13 +97,12 @@ class BulkClientTest {
|
|||
@Test
|
||||
void testRandomDocs() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
|
@ -115,35 +110,30 @@ class BulkClientTest {
|
|||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
} finally {
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
bulkClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testThreadedRandomDocs() throws Exception {
|
||||
void testThreadedRandomDocs() {
|
||||
int maxthreads = Runtime.getRuntime().availableProcessors();
|
||||
Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
||||
final long actions = ACTIONS;
|
||||
logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions);
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads * 2)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest)
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
|
||||
.put(Parameters.ENABLE_BULK_LOGGING.name(), "true")
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
|
@ -175,19 +165,15 @@ class BulkClientTest {
|
|||
logger.warn("latch timeout");
|
||||
}
|
||||
bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS);
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
} finally {
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test"));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test"));
|
||||
bulkClient.close();
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ class DuplicateIDTest {
|
|||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 5L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
|
@ -45,7 +45,7 @@ class DuplicateIDTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ class IndexShiftTest {
|
|||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test
|
||||
void testIndexShift() throws Exception {
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(TransportAdminClientProvider.class)
|
||||
|
|
|
@ -39,12 +39,11 @@ class SearchTest {
|
|||
@Test
|
||||
void testDocStream() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try (bulkClient) {
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
|
@ -54,12 +53,12 @@ class SearchTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
try (TransportSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
|
|
|
@ -28,7 +28,6 @@ class SmokeTest {
|
|||
this.helper = helper;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void smokeTest() throws Exception {
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
|
@ -64,8 +63,8 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
|||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.support.AbstractClient;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -33,8 +32,8 @@ import java.nio.file.Path;
|
|||
import java.nio.file.Paths;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -42,11 +41,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
private static final Logger logger = LogManager.getLogger("test");
|
||||
|
||||
private static final Random random = new Random();
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final String key = "es-instance";
|
||||
private static final String key = "es-instance-";
|
||||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
|
@ -62,71 +57,35 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
@Override
|
||||
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
|
||||
throws ParameterResolutionException {
|
||||
// initialize new helper here, increase counter
|
||||
return extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class);
|
||||
return extensionContext.getParent().isPresent() ?
|
||||
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.incrementAndGet(), key -> create(), Helper.class) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
Helper helper = extensionContext.getParent().isPresent() ?
|
||||
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
|
||||
Objects.requireNonNull(helper);
|
||||
logger.info("starting cluster with helper " + helper + " at " + helper.getHome());
|
||||
helper.startNode("1");
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = helper.client("1"). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
Object obj = response.iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
if (obj instanceof InetSocketTransportAddress) {
|
||||
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
|
||||
helper.host = address.address().getHostName();
|
||||
helper.port = address.address().getPort();
|
||||
}
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
logger.info("cluster name = {}", clusterStateResponse.getClusterName().value());
|
||||
logger.info("host = {} port = {}", helper.host, helper.port);
|
||||
helper.startNode();
|
||||
helper.greenHealth();
|
||||
logger.info("cluster name = {}", helper.clusterName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext extensionContext) throws Exception {
|
||||
Helper helper = extensionContext.getParent().get().getStore(ns)
|
||||
.getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class);
|
||||
closeNodes(helper);
|
||||
deleteFiles(Paths.get(helper.getHome() + "/data"));
|
||||
logger.info("data files wiped");
|
||||
Thread.sleep(2000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private void closeNodes(Helper helper) throws IOException {
|
||||
logger.info("closing all clients");
|
||||
for (AbstractClient client : helper.clients.values()) {
|
||||
client.close();
|
||||
}
|
||||
logger.info("closing all nodes");
|
||||
for (Node node : helper.nodes.values()) {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
logger.info("all nodes closed");
|
||||
Helper helper = extensionContext.getParent().isPresent() ?
|
||||
extensionContext.getParent().get().getStore(ns).getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class) : null;
|
||||
Objects.requireNonNull(helper);
|
||||
helper.closeNodes();
|
||||
deleteFiles(Paths.get(helper.getHome()));
|
||||
logger.info("files wiped");
|
||||
Thread.sleep(1000L); // let OS commit changes
|
||||
}
|
||||
|
||||
private static void deleteFiles(Path directory) throws IOException {
|
||||
if (Files.exists(directory)) {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
|
||||
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Files.delete(file);
|
||||
|
@ -144,7 +103,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
private Helper create() {
|
||||
Helper helper = new Helper();
|
||||
helper.setHome(System.getProperty("path.home") + "/" + helper.randomString(8));
|
||||
String home = System.getProperty("path.home", "build/elxtransport");
|
||||
helper.setHome(home + "/" + helper.randomString(8));
|
||||
helper.setClusterName("test-cluster-" + helper.randomString(8));
|
||||
logger.info("cluster: " + helper.getClusterName() + " home: " + helper.getHome());
|
||||
return helper;
|
||||
|
@ -160,9 +120,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
|
||||
int port;
|
||||
|
||||
Map<String, Node> nodes = new HashMap<>();
|
||||
Node node;
|
||||
|
||||
Map<String, AbstractClient> clients = new HashMap<>();
|
||||
AbstractClient client;
|
||||
|
||||
void setHome(String home) {
|
||||
this.home = home;
|
||||
|
@ -196,12 +156,18 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
.build();
|
||||
}
|
||||
|
||||
void startNode(String id) {
|
||||
buildNode(id).start();
|
||||
}
|
||||
|
||||
ElasticsearchClient client(String id) {
|
||||
return clients.get(id);
|
||||
void startNode() {
|
||||
buildNode().start();
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
|
||||
NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||
Object obj = response.iterator().next().getTransport().getAddress()
|
||||
.publishAddress();
|
||||
if (obj instanceof InetSocketTransportAddress) {
|
||||
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
|
||||
host = address.address().getHostName();
|
||||
port = address.address().getPort();
|
||||
logger.info("host = {} port = {}", host, port);
|
||||
}
|
||||
}
|
||||
|
||||
String randomString(int len) {
|
||||
|
@ -213,16 +179,51 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
return new String(buf);
|
||||
}
|
||||
|
||||
private Node buildNode(String id) {
|
||||
Node buildNode() {
|
||||
String id = "1";
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(getNodeSettings())
|
||||
.put("node.name", id)
|
||||
.build();
|
||||
Node node = new MockNode(nodeSettings);
|
||||
AbstractClient client = (AbstractClient) node.client();
|
||||
nodes.put(id, node);
|
||||
clients.put(id, client);
|
||||
node = new MockNode(nodeSettings);
|
||||
client = (AbstractClient) node.client();
|
||||
return node;
|
||||
}
|
||||
|
||||
void closeNodes() {
|
||||
if (client != null) {
|
||||
logger.info("closing client");
|
||||
client.close();
|
||||
}
|
||||
if (node != null) {
|
||||
logger.info("closing node");
|
||||
node.close();
|
||||
}
|
||||
}
|
||||
|
||||
void greenHealth() throws IOException {
|
||||
try {
|
||||
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
|
||||
.timeout(TimeValue.timeValueSeconds(30))).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
throw new IOException("cluster state is " + healthResponse.getStatus().name()
|
||||
+ ", from here on, everything will fail!");
|
||||
}
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
|
||||
}
|
||||
}
|
||||
|
||||
String clusterName() {
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
return clusterStateResponse.getClusterName().value();
|
||||
}
|
||||
|
||||
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
|
||||
|
||||
private static final Random random = new SecureRandom();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,4 +10,4 @@
|
|||
<AppenderRef ref="Console" />
|
||||
</Root>
|
||||
</Loggers>
|
||||
</configuration>
|
||||
</configuration>
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
group = org.xbib
|
||||
name = elx
|
||||
version = 2.2.1.19
|
||||
version = 2.2.1.20
|
||||
|
||||
gradle.wrapper.version = 6.4.1
|
||||
xbib-metrics.version = 2.1.0
|
||||
|
|
Loading…
Reference in a new issue