adjust to elx API update on 6.3 branch
This commit is contained in:
parent
f7509bcecd
commit
794446534c
31 changed files with 540 additions and 930 deletions
|
@ -22,6 +22,10 @@ public interface AdminClient extends NativeClient {
|
|||
*/
|
||||
IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException;
|
||||
|
||||
Map<String, ?> getMapping(String index) throws IOException;
|
||||
|
||||
Map<String, ?> getMapping(String index, String type) throws IOException;
|
||||
|
||||
/**
|
||||
* Delete an index.
|
||||
* @param indexDefinition the index definition
|
||||
|
@ -89,17 +93,6 @@ public interface AdminClient extends NativeClient {
|
|||
*/
|
||||
boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit);
|
||||
|
||||
|
||||
/**
|
||||
* Wait for index recovery (after replica change).
|
||||
*
|
||||
* @param index index
|
||||
* @param maxWaitTime maximum wait time
|
||||
* @param timeUnit time unit
|
||||
* @return true if wait succeeded, false if wait timed out
|
||||
*/
|
||||
boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit);
|
||||
|
||||
/**
|
||||
* Resolve alias.
|
||||
*
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package org.xbib.elx.api;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
|
||||
|
@ -7,6 +8,8 @@ import java.io.Closeable;
|
|||
|
||||
public interface BulkMetric extends Closeable {
|
||||
|
||||
void init(Settings settings);
|
||||
|
||||
Metered getTotalIngest();
|
||||
|
||||
Count getTotalIngestSizeInBytes();
|
||||
|
|
|
@ -55,7 +55,7 @@ public interface NativeClient extends Closeable {
|
|||
*/
|
||||
void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit);
|
||||
|
||||
Map<String, ?> getMapping(String index, String mapping);
|
||||
void waitForShards(long maxWaitTime, TimeUnit timeUnit);
|
||||
|
||||
long getSearchableDocs(String index);
|
||||
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
package org.xbib.elx.api;
|
||||
|
||||
public interface ReadClientProvider<C extends ReadClient> {
|
||||
|
||||
C getReadClient();
|
||||
}
|
|
@ -4,9 +4,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
|
@ -26,22 +23,24 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
|
|||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.AliasAction;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
|
@ -53,9 +52,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|||
import org.elasticsearch.search.sort.SortBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.xbib.elx.api.AdminClient;
|
||||
import org.xbib.elx.api.BulkController;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.elx.api.ExtendedClient;
|
||||
import org.xbib.elx.api.IndexAliasAdder;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.api.IndexPruneResult;
|
||||
|
@ -71,19 +70,11 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.time.LocalDate;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -92,7 +83,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
public abstract class AbstractAdminClient extends AbstractNativeClient implements AdminClient {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(AbstractExtendedClient.class.getName());
|
||||
private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName());
|
||||
|
||||
/**
|
||||
* The one and only index type name used in the extended client.
|
||||
|
@ -100,17 +91,6 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
*/
|
||||
private static final String TYPE_NAME = "doc";
|
||||
|
||||
/**
|
||||
* The Elasticsearch client.
|
||||
*/
|
||||
private ElasticsearchClient client;
|
||||
|
||||
private BulkMetric bulkMetric;
|
||||
|
||||
private BulkController bulkController;
|
||||
|
||||
private AtomicBoolean closed;
|
||||
|
||||
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
|
||||
@Override
|
||||
public List<String> getMovedAliases() {
|
||||
|
@ -146,348 +126,53 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
};
|
||||
|
||||
@Override
|
||||
public AbstractExtendedClient setClient(ElasticsearchClient client) {
|
||||
this.client = client;
|
||||
return this;
|
||||
public Map<String, ?> getMapping(String index) throws IOException {
|
||||
return getMapping(index, TYPE_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElasticsearchClient getClient() {
|
||||
return client;
|
||||
public Map<String, ?> getMapping(String index, String mapping) throws IOException {
|
||||
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
||||
.setIndices(index)
|
||||
.setTypes(mapping);
|
||||
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
|
||||
logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap());
|
||||
return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkMetric getBulkMetric() {
|
||||
return bulkMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkController getBulkController() {
|
||||
return bulkController;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractExtendedClient init(Settings settings) throws IOException {
|
||||
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
|
||||
if (client == null) {
|
||||
client = createClient(settings);
|
||||
}
|
||||
if (bulkMetric == null) {
|
||||
bulkMetric = new DefaultBulkMetric();
|
||||
bulkMetric.init(settings);
|
||||
}
|
||||
if (bulkController == null) {
|
||||
bulkController = new DefaultBulkController(this, bulkMetric);
|
||||
bulkController.init(settings);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
if (bulkController != null) {
|
||||
bulkController.flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
ensureActive();
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
if (bulkMetric != null) {
|
||||
logger.info("closing bulk metric");
|
||||
bulkMetric.close();
|
||||
}
|
||||
if (bulkController != null) {
|
||||
logger.info("closing bulk controller");
|
||||
bulkController.close();
|
||||
}
|
||||
closeClient();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterName() {
|
||||
ensureActive();
|
||||
try {
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear();
|
||||
ClusterStateResponse clusterStateResponse =
|
||||
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
return clusterStateResponse.getClusterName().value();
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return "TIMEOUT";
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return "DISCONNECTED";
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return "[" + e.getMessage() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException {
|
||||
ensureActive();
|
||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||
URL indexSettings = indexDefinition.getSettingsUrl();
|
||||
if (indexSettings == null) {
|
||||
logger.warn("warning while creating index '{}', no settings/mappings",
|
||||
indexDefinition.getFullIndexName());
|
||||
newIndex(indexDefinition.getFullIndexName());
|
||||
return this;
|
||||
}
|
||||
URL indexMappings = indexDefinition.getMappingsUrl();
|
||||
if (indexMappings == null) {
|
||||
logger.warn("warning while creating index '{}', no mappings",
|
||||
indexDefinition.getFullIndexName());
|
||||
newIndex(indexDefinition.getFullIndexName(), indexSettings.openStream(), null);
|
||||
return this;
|
||||
}
|
||||
try (InputStream indexSettingsInput = indexSettings.openStream();
|
||||
InputStream indexMappingsInput = indexMappings.openStream()) {
|
||||
newIndex(indexDefinition.getFullIndexName(), indexSettingsInput, indexMappingsInput);
|
||||
} catch (IOException e) {
|
||||
if (indexDefinition.ignoreErrors()) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
logger.warn("warning while creating index '{}' with settings at {} and mappings at {}",
|
||||
indexDefinition.getFullIndexName(), indexSettings, indexMappings);
|
||||
} else {
|
||||
logger.error("error while creating index '{}' with settings at {} and mappings at {}",
|
||||
indexDefinition.getFullIndexName(), indexSettings, indexMappings);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient newIndex(String index) throws IOException {
|
||||
return newIndex(index, Settings.EMPTY, (Map<String, Object>) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException {
|
||||
return newIndex(index,
|
||||
Settings.builder().loadFromStream(".json", settings, true).build(),
|
||||
mapping != null ? JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mapping).mapOrdered() : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient newIndex(String index, Settings settings) throws IOException {
|
||||
return newIndex(index, settings, (Map<String, Object>) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException {
|
||||
return newIndex(index, settings,
|
||||
mapping != null ? JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mapping).mapOrdered() : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException {
|
||||
ensureActive();
|
||||
if (index == null) {
|
||||
logger.warn("no index name given to create index");
|
||||
return this;
|
||||
}
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
|
||||
if (settings != null) {
|
||||
createIndexRequest.settings(settings);
|
||||
}
|
||||
if (mapping != null) {
|
||||
createIndexRequest.mapping(TYPE_NAME, mapping);
|
||||
}
|
||||
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
logger.info("index {} created: {}", index,
|
||||
Strings.toString(createIndexResponse.toXContent(builder, ToXContent.EMPTY_PARAMS)));
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient deleteIndex(IndexDefinition indexDefinition) {
|
||||
public AdminClient deleteIndex(IndexDefinition indexDefinition) {
|
||||
return deleteIndex(indexDefinition.getFullIndexName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient deleteIndex(String index) {
|
||||
ensureActive();
|
||||
public AdminClient deleteIndex(String index) {
|
||||
ensureClientIsPresent();
|
||||
if (index == null) {
|
||||
logger.warn("no index name given to delete index");
|
||||
return this;
|
||||
}
|
||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
|
||||
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
|
||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||
waitForShards(30L, TimeUnit.SECONDS);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient startBulk(IndexDefinition indexDefinition) throws IOException {
|
||||
startBulk(indexDefinition.getFullIndexName(), -1, 1);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
|
||||
throws IOException {
|
||||
if (bulkController != null) {
|
||||
ensureActive();
|
||||
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException {
|
||||
if (bulkController != null) {
|
||||
ensureActive();
|
||||
bulkController.stopBulkMode(indexDefinition);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException {
|
||||
if (bulkController != null) {
|
||||
ensureActive();
|
||||
bulkController.stopBulkMode(index, timeout, timeUnit);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient index(String index, String id, boolean create, String source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
|
||||
.source(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient index(String index, String id, boolean create, BytesReference source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
|
||||
.source(source, XContentType.JSON));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient index(IndexRequest indexRequest) {
|
||||
ensureActive();
|
||||
bulkController.index(indexRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient delete(String index, String id) {
|
||||
return delete(new DeleteRequest(index, TYPE_NAME, id));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient delete(DeleteRequest deleteRequest) {
|
||||
ensureActive();
|
||||
bulkController.delete(deleteRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient update(String index, String id, BytesReference source) {
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id)
|
||||
.doc(source, XContentType.JSON));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient update(String index, String id, String source) {
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id)
|
||||
.doc(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient update(UpdateRequest updateRequest) {
|
||||
ensureActive();
|
||||
bulkController.update(updateRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
|
||||
ensureActive();
|
||||
return bulkController.waitForResponses(timeout, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
|
||||
ensureActive();
|
||||
ensureIndexGiven(index);
|
||||
GetSettingsRequest settingsRequest = new GetSettingsRequest();
|
||||
settingsRequest.indices(index);
|
||||
GetSettingsResponse settingsResponse = client.execute(GetSettingsAction.INSTANCE, settingsRequest).actionGet();
|
||||
int shards = settingsResponse.getIndexToSettings().get(index).getAsInt("index.number_of_shards", -1);
|
||||
if (shards > 0) {
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
.indices(index)
|
||||
.waitForActiveShards(shards)
|
||||
.timeout(timeout);
|
||||
ClusterHealthResponse healthResponse =
|
||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
logger.error("timeout waiting for recovery");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
|
||||
ensureActive();
|
||||
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
|
||||
ensureActive();
|
||||
try {
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
|
||||
new ClusterHealthRequest().timeout(timeout)).actionGet();
|
||||
ClusterHealthStatus status = healthResponse.getStatus();
|
||||
return status.name();
|
||||
} catch (ElasticsearchTimeoutException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return "TIMEOUT";
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return "DISCONNECTED";
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return "[" + e.getMessage() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
|
||||
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
|
||||
return updateReplicaLevel(indexDefinition.getFullIndexName(), level,
|
||||
indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException {
|
||||
waitForCluster("YELLOW", maxWaitTime, timeUnit); // let cluster settle down from critical operations
|
||||
if (level > 0) {
|
||||
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
|
||||
waitForRecovery(index, maxWaitTime, timeUnit);
|
||||
if (level < 1) {
|
||||
logger.warn("invalid replica level");
|
||||
return this;
|
||||
}
|
||||
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
|
||||
waitForShards(maxWaitTime, timeUnit);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -510,27 +195,9 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
return replica;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient flushIndex(String index) {
|
||||
if (index != null) {
|
||||
ensureActive();
|
||||
client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient refreshIndex(String index) {
|
||||
if (index != null) {
|
||||
ensureActive();
|
||||
client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String resolveMostRecentIndex(String alias) {
|
||||
ensureActive();
|
||||
ensureClientIsPresent();
|
||||
if (alias == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -558,7 +225,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
|
||||
@Override
|
||||
public String resolveAlias(String alias) {
|
||||
ensureActive();
|
||||
ensureClientIsPresent();
|
||||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||
clusterStateRequest.blocks(false);
|
||||
clusterStateRequest.metaData(true);
|
||||
|
@ -569,7 +236,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet();
|
||||
SortedMap<String, AliasOrIndex> map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup();
|
||||
AliasOrIndex aliasOrIndex = map.get(alias);
|
||||
return aliasOrIndex != null ? aliasOrIndex.getIndices().iterator().next().getIndex().getName() : null;
|
||||
return aliasOrIndex != null ? aliasOrIndex.getIndices().iterator().next().getIndex() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -600,7 +267,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
@Override
|
||||
public IndexShiftResult shiftIndex(String index, String fullIndexName,
|
||||
List<String> additionalAliases, IndexAliasAdder adder) {
|
||||
ensureActive();
|
||||
ensureClientIsPresent();
|
||||
if (index == null) {
|
||||
return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to
|
||||
}
|
||||
|
@ -616,8 +283,8 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
final List<String> moveAliases = new ArrayList<>();
|
||||
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
|
||||
if (oldAliasMap == null || !oldAliasMap.containsKey(index)) {
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName).alias(index));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, index));
|
||||
newAliases.add(index);
|
||||
}
|
||||
// move existing aliases
|
||||
|
@ -625,14 +292,14 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
for (Map.Entry<String, String> entry : oldAliasMap.entrySet()) {
|
||||
String alias = entry.getKey();
|
||||
String filter = entry.getValue();
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove()
|
||||
.indices(oldIndex).alias(alias));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.REMOVE,
|
||||
oldIndex, alias));
|
||||
if (filter != null) {
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName).alias(alias).filter(filter));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, alias).filter(filter));
|
||||
} else {
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName).alias(alias));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, alias));
|
||||
}
|
||||
moveAliases.add(alias);
|
||||
}
|
||||
|
@ -645,20 +312,20 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
if (adder != null) {
|
||||
adder.addIndexAlias(indicesAliasesRequest, fullIndexName, additionalAlias);
|
||||
} else {
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName).alias(additionalAlias));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, additionalAlias));
|
||||
}
|
||||
newAliases.add(additionalAlias);
|
||||
} else {
|
||||
String filter = oldAliasMap.get(additionalAlias);
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove()
|
||||
.indices(oldIndex).alias(additionalAlias));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.REMOVE,
|
||||
oldIndex, additionalAlias));
|
||||
if (filter != null) {
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName).alias(additionalAlias).filter(filter));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, additionalAlias).filter(filter));
|
||||
} else {
|
||||
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(fullIndexName).alias(additionalAlias));
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, additionalAlias));
|
||||
}
|
||||
moveAliases.add(additionalAlias);
|
||||
}
|
||||
|
@ -668,8 +335,8 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
logger.debug("indices alias request = {}", indicesAliasesRequest.getAliasActions().toString());
|
||||
IndicesAliasesResponse indicesAliasesResponse =
|
||||
client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet();
|
||||
logger.debug("response isAcknowledged = {} isFragment = {}",
|
||||
indicesAliasesResponse.isAcknowledged(), indicesAliasesResponse.isFragment());
|
||||
logger.debug("response isAcknowledged = {}",
|
||||
indicesAliasesResponse.isAcknowledged());
|
||||
}
|
||||
return new SuccessIndexShiftResult(moveAliases, newAliases);
|
||||
}
|
||||
|
@ -688,7 +355,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
if (index.equals(fullIndexName)) {
|
||||
return EMPTY_INDEX_PRUNE_RESULT;
|
||||
}
|
||||
ensureActive();
|
||||
ensureClientIsPresent();
|
||||
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
|
||||
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
|
||||
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
|
||||
|
@ -733,11 +400,11 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
|
||||
@Override
|
||||
public Long mostRecentDocument(String index, String timestampfieldname) {
|
||||
ensureActive();
|
||||
SortBuilder<?> sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
|
||||
ensureClientIsPresent();
|
||||
SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
|
||||
SearchSourceBuilder builder = new SearchSourceBuilder();
|
||||
builder.sort(sort);
|
||||
builder.storedField(timestampfieldname);
|
||||
builder.field(timestampfieldname);
|
||||
builder.size(1);
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices(index);
|
||||
|
@ -820,24 +487,10 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
|
||||
@Override
|
||||
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
|
||||
ensureActive();
|
||||
ensureClientIsPresent();
|
||||
if (index == null) {
|
||||
throw new IOException("no index name given");
|
||||
}
|
||||
try {
|
||||
URL url = new URL(string);
|
||||
try (InputStream inputStream = url.openStream()) {
|
||||
Settings settings = Settings.builder().loadFromStream(string, inputStream, true).build();
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
settings.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
return Strings.toString(builder);
|
||||
}
|
||||
} catch (MalformedURLException e) {
|
||||
return string;
|
||||
}
|
||||
if (value == null) {
|
||||
throw new IOException("no value given");
|
||||
}
|
||||
Settings.Builder updateSettingsBuilder = Settings.builder();
|
||||
updateSettingsBuilder.put(key, value.toString());
|
||||
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
|
||||
|
@ -845,26 +498,41 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
||||
}
|
||||
|
||||
private void ensureActive() {
|
||||
if (this instanceof MockExtendedClient) {
|
||||
return;
|
||||
private static String findSettingsFrom(String string) throws IOException {
|
||||
if (string == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
URL url = new URL(string);
|
||||
try (InputStream inputStream = url.openStream()) {
|
||||
Settings settings = Settings.builder().loadFromStream(string, inputStream).build();
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
settings.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
return builder.string();
|
||||
}
|
||||
} catch (MalformedURLException e) {
|
||||
return string;
|
||||
}
|
||||
}
|
||||
|
||||
private static String findMappingsFrom(String string) throws IOException {
|
||||
if (string == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
URL url = new URL(string);
|
||||
try (InputStream inputStream = url.openStream()) {
|
||||
if (string.endsWith(".json")) {
|
||||
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered();
|
||||
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(inputStream).mapOrdered();
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.startObject().map(mappings).endObject();
|
||||
return Strings.toString(builder);
|
||||
return builder.string();
|
||||
}
|
||||
if (string.endsWith(".yml") || string.endsWith(".yaml")) {
|
||||
Map<String, ?> mappings = YamlXContent.yamlXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered();
|
||||
Map<String, ?> mappings = YamlXContent.yamlXContent.createParser(inputStream).mapOrdered();
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.startObject().map(mappings).endObject();
|
||||
return Strings.toString(builder);
|
||||
return builder.string();
|
||||
}
|
||||
}
|
||||
return string;
|
||||
|
@ -873,12 +541,6 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
}
|
||||
}
|
||||
|
||||
private void ensureIndexGiven(String index) {
|
||||
if (index == null) {
|
||||
throw new IllegalArgumentException("no index given");
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> getFilters(GetAliasesResponse getAliasesResponse) {
|
||||
Map<String, String> result = new HashMap<>();
|
||||
for (ObjectObjectCursor<String, List<AliasMetaData>> object : getAliasesResponse.getAliases()) {
|
||||
|
@ -896,7 +558,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
}
|
||||
|
||||
public void checkMapping(String index) {
|
||||
ensureActive();
|
||||
ensureClientIsPresent();
|
||||
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
|
||||
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
|
||||
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
|
||||
|
|
|
@ -13,12 +13,9 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
|||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -27,7 +24,6 @@ import org.xbib.elx.api.BulkClient;
|
|||
import org.xbib.elx.api.BulkController;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
|
@ -91,10 +87,9 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
|
|||
@Override
|
||||
public void newIndex(IndexDefinition indexDefinition) throws IOException {
|
||||
Settings settings = indexDefinition.getSettings() == null ? null :
|
||||
Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build();
|
||||
Settings.builder().loadFromSource(indexDefinition.getSettings()).build();
|
||||
Map<String, ?> mappings = indexDefinition.getMappings() == null ? null :
|
||||
JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered();
|
||||
JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered();
|
||||
newIndex(indexDefinition.getFullIndexName(), settings, mappings);
|
||||
}
|
||||
|
||||
|
@ -110,9 +105,8 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
|
|||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
|
||||
String mappingString = Strings.toString(builder);
|
||||
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingString).mapOrdered();
|
||||
String mappingString = builder.string();
|
||||
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(mappingString).mapOrdered();
|
||||
newIndex(index, settings, mappings);
|
||||
}
|
||||
|
||||
|
@ -134,7 +128,7 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
|
|||
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
logger.info("index {} created: {}", index,
|
||||
Strings.toString(createIndexResponse.toXContent(builder, ToXContent.EMPTY_PARAMS)));
|
||||
createIndexResponse.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -169,20 +163,18 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
|
|||
|
||||
@Override
|
||||
public BulkClient index(String index, String id, boolean create, String source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
|
||||
.source(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON));
|
||||
return index(index, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkClient index(String index, String id, boolean create, BytesReference source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
|
||||
.source(source, XContentType.JSON));
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkClient index(IndexRequest indexRequest) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.index(indexRequest);
|
||||
bulkController.bulkIndex(indexRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -194,7 +186,7 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
|
|||
@Override
|
||||
public BulkClient delete(DeleteRequest deleteRequest) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.delete(deleteRequest);
|
||||
bulkController.bulkDelete(deleteRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -213,14 +205,14 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
|
|||
@Override
|
||||
public BulkClient update(UpdateRequest updateRequest) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.update(updateRequest);
|
||||
bulkController.bulkUpdate(updateRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
|
||||
ensureClientIsPresent();
|
||||
return bulkController.waitForResponses(timeout, timeUnit);
|
||||
return bulkController.waitForBulkResponses(timeout, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -13,9 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
|
@ -28,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.xbib.elx.api.NativeClient;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -38,7 +34,7 @@ public abstract class AbstractNativeClient implements NativeClient {
|
|||
|
||||
/**
|
||||
* The one and only index type name used in the extended client.
|
||||
* Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_".
|
||||
* Note that all Elasticsearch versions before 6.2.0 do not allow a prepending "_".
|
||||
*/
|
||||
protected static final String TYPE_NAME = "doc";
|
||||
|
||||
|
@ -108,6 +104,23 @@ public abstract class AbstractNativeClient implements NativeClient {
|
|||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
||||
ensureClientIsPresent();
|
||||
logger.info("waiting for cluster shard settling");
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
//.waitForActiveShards(0)
|
||||
.waitForRelocatingShards(0)
|
||||
.timeout(timeout);
|
||||
ClusterHealthResponse healthResponse =
|
||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||
if (healthResponse.isTimedOut()) {
|
||||
String message = "timeout waiting for cluster shards";
|
||||
logger.error(message);
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
|
||||
|
@ -130,16 +143,6 @@ public abstract class AbstractNativeClient implements NativeClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ?> getMapping(String index, String mapping) {
|
||||
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
||||
.setIndices(index)
|
||||
.setTypes(mapping);
|
||||
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
|
||||
logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap());
|
||||
return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSearchableDocs(String index) {
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
||||
|
@ -152,7 +155,7 @@ public abstract class AbstractNativeClient implements NativeClient {
|
|||
@Override
|
||||
public boolean isIndexExists(String index) {
|
||||
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
|
||||
indicesExistsRequest.indices(index);
|
||||
indicesExistsRequest.indices(new String[]{index});
|
||||
IndicesExistsResponse indicesExistsResponse =
|
||||
client.execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
|
||||
return indicesExistsResponse.isExists();
|
||||
|
|
|
@ -2,9 +2,6 @@ package org.xbib.elx.common;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
|
@ -13,6 +10,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.xbib.elx.api.BulkClient;
|
||||
import org.xbib.elx.api.BulkController;
|
||||
import org.xbib.elx.api.BulkListener;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.elx.api.BulkProcessor;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
|
@ -47,11 +45,9 @@ public class DefaultBulkController implements BulkController {
|
|||
|
||||
private BulkListener bulkListener;
|
||||
|
||||
private AtomicBoolean active;
|
||||
private final AtomicBoolean active;
|
||||
|
||||
private boolean enableBulkLogging;
|
||||
|
||||
public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) {
|
||||
public DefaultBulkController(BulkClient client, BulkMetric bulkMetric) {
|
||||
this.client = client;
|
||||
this.bulkMetric = bulkMetric;
|
||||
this.indexNames = new ArrayList<>();
|
||||
|
@ -62,6 +58,11 @@ public class DefaultBulkController implements BulkController {
|
|||
this.maxWaitTimeUnit = TimeUnit.SECONDS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkMetric getBulkMetric() {
|
||||
return bulkMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable getLastBulkError() {
|
||||
return bulkListener.getLastBulkError();
|
||||
|
@ -78,9 +79,9 @@ public class DefaultBulkController implements BulkController {
|
|||
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(),
|
||||
"maxVolumePerRequest"));
|
||||
this.enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
|
||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(),
|
||||
Parameters.ENABLE_BULK_LOGGING.getValue());
|
||||
this.bulkListener = new BulkListener();
|
||||
this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging);
|
||||
this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener)
|
||||
.setBulkActions(maxActionsPerRequest)
|
||||
.setConcurrentRequests(maxConcurrentRequests)
|
||||
|
@ -96,6 +97,11 @@ public class DefaultBulkController implements BulkController {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inactivate() {
|
||||
this.active.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
|
||||
startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(),
|
||||
|
@ -118,65 +124,49 @@ public class DefaultBulkController implements BulkController {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void index(IndexRequest indexRequest) {
|
||||
public void bulkIndex(IndexRequest indexRequest) {
|
||||
ensureActiveAndBulk();
|
||||
if (!active.get()) {
|
||||
throw new IllegalStateException("inactive");
|
||||
}
|
||||
try {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
|
||||
}
|
||||
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id());
|
||||
bulkProcessor.add(indexRequest);
|
||||
} catch (Exception e) {
|
||||
bulkListener.lastBulkError = e;
|
||||
active.set(false);
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("bulk add of index failed: " + e.getMessage(), e);
|
||||
}
|
||||
inactivate();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(DeleteRequest deleteRequest) {
|
||||
if (!active.get()) {
|
||||
throw new IllegalStateException("inactive");
|
||||
}
|
||||
public void bulkDelete(DeleteRequest deleteRequest) {
|
||||
ensureActiveAndBulk();
|
||||
try {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
|
||||
}
|
||||
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
|
||||
bulkProcessor.add(deleteRequest);
|
||||
} catch (Exception e) {
|
||||
bulkListener.lastBulkError = e;
|
||||
active.set(false);
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("bulk add of delete failed: " + e.getMessage(), e);
|
||||
}
|
||||
inactivate();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(UpdateRequest updateRequest) {
|
||||
if (!active.get()) {
|
||||
throw new IllegalStateException("inactive");
|
||||
}
|
||||
public void bulkUpdate(UpdateRequest updateRequest) {
|
||||
ensureActiveAndBulk();
|
||||
try {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
|
||||
}
|
||||
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id());
|
||||
bulkProcessor.add(updateRequest);
|
||||
} catch (Exception e) {
|
||||
bulkListener.lastBulkError = e;
|
||||
active.set(false);
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("bulk add of update failed: " + e.getMessage(), e);
|
||||
}
|
||||
inactivate();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
|
||||
public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) {
|
||||
try {
|
||||
return bulkProcessor.awaitFlush(timeout, timeUnit);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -195,7 +185,7 @@ public class DefaultBulkController implements BulkController {
|
|||
@Override
|
||||
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
|
||||
flush();
|
||||
if (waitForResponses(timeout, timeUnit)) {
|
||||
if (waitForBulkResponses(timeout, timeUnit)) {
|
||||
if (indexNames.contains(index)) {
|
||||
Long secs = stopBulkRefreshIntervals.get(index);
|
||||
if (secs != null && secs != 0L) {
|
||||
|
@ -243,87 +233,4 @@ public class DefaultBulkController implements BulkController {
|
|||
}
|
||||
}
|
||||
|
||||
private class BulkListener implements DefaultBulkProcessor.Listener {
|
||||
|
||||
private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
|
||||
|
||||
private Throwable lastBulkError = null;
|
||||
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {
|
||||
long l = 0;
|
||||
if (bulkMetric != null) {
|
||||
l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().inc();
|
||||
int n = request.numberOfActions();
|
||||
bulkMetric.getSubmitted().inc(n);
|
||||
bulkMetric.getCurrentIngestNumDocs().inc(n);
|
||||
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
|
||||
}
|
||||
if (enableBulkLogging && logger.isDebugEnabled()) {
|
||||
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
|
||||
executionId,
|
||||
request.numberOfActions(),
|
||||
request.estimatedSizeInBytes(),
|
||||
l);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
long l = 0;
|
||||
if (bulkMetric != null) {
|
||||
l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
bulkMetric.getSucceeded().inc(response.getItems().length);
|
||||
}
|
||||
int n = 0;
|
||||
for (BulkItemResponse itemResponse : response.getItems()) {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
|
||||
}
|
||||
if (itemResponse.isFailed()) {
|
||||
n++;
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getSucceeded().dec(1);
|
||||
bulkMetric.getFailed().inc(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (enableBulkLogging && logger.isDebugEnabled() && bulkMetric != null) {
|
||||
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
|
||||
executionId,
|
||||
bulkMetric.getSucceeded().getCount(),
|
||||
bulkMetric.getFailed().getCount(),
|
||||
response.getTook().millis(),
|
||||
l);
|
||||
}
|
||||
if (n > 0) {
|
||||
if (enableBulkLogging && logger.isErrorEnabled()) {
|
||||
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
|
||||
executionId, n, response.buildFailureMessage());
|
||||
}
|
||||
} else {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
if (bulkMetric != null) {
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
}
|
||||
lastBulkError = failure;
|
||||
active.set(false);
|
||||
if (enableBulkLogging && logger.isErrorEnabled()) {
|
||||
logger.error("after bulk [" + executionId + "] error", failure);
|
||||
}
|
||||
}
|
||||
|
||||
Throwable getLastBulkError() {
|
||||
return lastBulkError;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.xbib.elx.api.BulkController;
|
|||
import org.xbib.elx.api.BulkListener;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
|
||||
class DefaultBulkListener implements BulkListener {
|
||||
public class DefaultBulkListener implements BulkListener {
|
||||
|
||||
private final Logger logger = LogManager.getLogger(BulkListener.class.getName());
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
|
@ -36,6 +37,10 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
submitted = new CountMetric();
|
||||
succeeded = new CountMetric();
|
||||
failed = new CountMetric();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) {
|
||||
start();
|
||||
}
|
||||
|
||||
|
|
|
@ -388,7 +388,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
listener.beforeBulk(executionId, bulkRequest);
|
||||
semaphore.acquire();
|
||||
acquired = true;
|
||||
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<BulkResponse>() {
|
||||
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
try {
|
||||
|
|
|
@ -43,8 +43,8 @@ public class MockAdminClient extends AbstractAdminClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
|
||||
return true;
|
||||
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,6 +26,10 @@ public class MockBulkClient extends AbstractBulkClient {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ElasticsearchClient createClient(Settings settings) {
|
||||
return null;
|
||||
|
|
|
@ -3,6 +3,8 @@ package org.xbib.elx.common;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* A mocked client, it does not perform any actions on a cluster. Useful for testing.
|
||||
*/
|
||||
|
@ -22,6 +24,10 @@ public class MockSearchClient extends AbstractSearchClient {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ElasticsearchClient createClient(Settings settings) {
|
||||
return null;
|
||||
|
|
|
@ -2,8 +2,8 @@ package org.xbib.elx.node;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -40,9 +40,9 @@ public class NodeBulkClient extends AbstractBulkClient {
|
|||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true")));
|
||||
logger.info("creating node client on {} with effective settings {}",
|
||||
version, Strings.toString(builder));
|
||||
version, builder.string());
|
||||
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
|
||||
this.node = new BulkNode(new Environment(effectiveSettings, null), plugins);
|
||||
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
|
||||
try {
|
||||
node.start();
|
||||
} catch (Exception e) {
|
||||
|
@ -62,7 +62,7 @@ public class NodeBulkClient extends AbstractBulkClient {
|
|||
private static class BulkNode extends Node {
|
||||
|
||||
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
super(env, classpathPlugins);
|
||||
super(env, Version.CURRENT, classpathPlugins);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ package org.xbib.elx.node;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -40,9 +40,9 @@ public class NodeSearchClient extends AbstractSearchClient {
|
|||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true")));
|
||||
logger.info("creating node client on {} with effective settings {}",
|
||||
version, Strings.toString(builder));
|
||||
version, builder.string());
|
||||
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
|
||||
this.node = new BulkNode(new Environment(effectiveSettings, null), plugins);
|
||||
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
|
||||
try {
|
||||
node.start();
|
||||
} catch (Exception e) {
|
||||
|
@ -62,7 +62,7 @@ public class NodeSearchClient extends AbstractSearchClient {
|
|||
private static class BulkNode extends Node {
|
||||
|
||||
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
super(env, classpathPlugins);
|
||||
super(env, Version.CURRENT, classpathPlugins);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.node.NodeAdminClient;
|
||||
import org.xbib.elx.node.NodeAdminClientProvider;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
|
||||
|
@ -71,22 +73,25 @@ class BulkClientTest {
|
|||
|
||||
@Test
|
||||
void testMapping() throws Exception {
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.build();
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.startObject("doc")
|
||||
.startObject("properties")
|
||||
.startObject("location")
|
||||
.field("type", "geo_point")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
||||
assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties"));
|
||||
bulkClient.close();
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.build()) {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.startObject("doc")
|
||||
.startObject("properties")
|
||||
.startObject("location")
|
||||
.field("type", "geo_point")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
||||
assertTrue(adminClient.getMapping("test", "doc").containsKey("properties"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -33,35 +33,22 @@ class DuplicateIDTest {
|
|||
@Test
|
||||
void testDuplicateDocIDs() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try {
|
||||
client.newIndex("test");
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
client.index("test", helper.randomString(1), false,
|
||||
bulkClient.index("test", helper.randomString(1), false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
client.refreshIndex("test");
|
||||
SearchSourceBuilder builder = new SearchSourceBuilder();
|
||||
builder.query(QueryBuilders.matchAllQuery());
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices("test");
|
||||
searchRequest.types("test");
|
||||
searchRequest.source(builder);
|
||||
long hits = helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits();
|
||||
logger.info("hits = {}", hits);
|
||||
assertTrue(hits < ACTIONS);
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -11,13 +11,11 @@ import org.xbib.elx.node.NodeAdminClient;
|
|||
import org.xbib.elx.node.NodeAdminClientProvider;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -35,13 +33,12 @@ class IndexPruneTest {
|
|||
|
||||
@Test
|
||||
void testPrune() throws IOException {
|
||||
final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.build();
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
|
@ -62,20 +59,13 @@ class IndexPruneTest {
|
|||
assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4"));
|
||||
List<Boolean> list = new ArrayList<>();
|
||||
for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) {
|
||||
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
|
||||
indicesExistsRequest.indices(index);
|
||||
IndicesExistsResponse indicesExistsResponse =
|
||||
client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
|
||||
list.add(indicesExistsResponse.isExists());
|
||||
list.add(adminClient.isIndexExists(index));
|
||||
}
|
||||
logger.info(list);
|
||||
assertFalse(list.get(0));
|
||||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
} finally {
|
||||
adminClient.close();
|
||||
bulkClient.close();
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.xbib.elx.node.test;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.cluster.metadata.AliasAction;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -13,11 +14,9 @@ import org.xbib.elx.node.NodeAdminClient;
|
|||
import org.xbib.elx.node.NodeAdminClientProvider;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
@ -34,55 +33,50 @@ class IndexShiftTest {
|
|||
|
||||
@Test
|
||||
void testIndexShift() throws Exception {
|
||||
final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.build();
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
client.newIndex("test_shift", settings);
|
||||
bulkClient.newIndex("test_shift", settings);
|
||||
for (int i = 0; i < 1; i++) {
|
||||
client.index("test_shift", helper.randomString(1), false,
|
||||
bulkClient.index("test_shift", helper.randomString(1), false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
IndexShiftResult indexShiftResult =
|
||||
client.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c"));
|
||||
adminClient.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("a"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("b"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("c"));
|
||||
assertTrue(indexShiftResult.getMovedAliases().isEmpty());
|
||||
|
||||
Map<String, String> aliases = client.getAliases("test_shift");
|
||||
Map<String, String> aliases = adminClient.getAliases("test_shift");
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
assertTrue(aliases.containsKey("c"));
|
||||
assertTrue(aliases.containsKey("test"));
|
||||
|
||||
String resolved = client.resolveAlias("test");
|
||||
aliases = client.getAliases(resolved);
|
||||
String resolved = adminClient.resolveAlias("test");
|
||||
aliases = adminClient.getAliases(resolved);
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
assertTrue(aliases.containsKey("c"));
|
||||
assertTrue(aliases.containsKey("test"));
|
||||
|
||||
client.newIndex("test_shift2", settings);
|
||||
bulkClient.newIndex("test_shift2", settings);
|
||||
for (int i = 0; i < 1; i++) {
|
||||
client.index("test_shift2", helper.randomString(1), false,
|
||||
bulkClient.index("test_shift2", helper.randomString(1), false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
|
||||
indexShiftResult = client.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"),
|
||||
(request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(index).alias(alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
indexShiftResult = adminClient.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"),
|
||||
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||
);
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("d"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("e"));
|
||||
|
@ -90,26 +84,21 @@ class IndexShiftTest {
|
|||
assertTrue(indexShiftResult.getMovedAliases().contains("a"));
|
||||
assertTrue(indexShiftResult.getMovedAliases().contains("b"));
|
||||
assertTrue(indexShiftResult.getMovedAliases().contains("c"));
|
||||
|
||||
aliases = client.getAliases("test_shift2");
|
||||
aliases = adminClient.getAliases("test_shift2");
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
assertTrue(aliases.containsKey("c"));
|
||||
assertTrue(aliases.containsKey("d"));
|
||||
assertTrue(aliases.containsKey("e"));
|
||||
assertTrue(aliases.containsKey("f"));
|
||||
|
||||
resolved = client.resolveAlias("test");
|
||||
aliases = client.getAliases(resolved);
|
||||
resolved = adminClient.resolveAlias("test");
|
||||
aliases = adminClient.getAliases(resolved);
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
assertTrue(aliases.containsKey("c"));
|
||||
assertTrue(aliases.containsKey("d"));
|
||||
assertTrue(aliases.containsKey("e"));
|
||||
assertTrue(aliases.containsKey("f"));
|
||||
} finally {
|
||||
adminClient.close();
|
||||
bulkClient.close();
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
package org.xbib.elx.node.test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
import org.xbib.elx.node.NodeSearchClient;
|
||||
import org.xbib.elx.node.NodeSearchClientProvider;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
class SearchTest {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
||||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDocStream() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try (bulkClient) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
}
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setSearchClientProvider(NodeSearchClientProvider.class)
|
||||
.build()) {
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMinutes(1), 10);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
final AtomicInteger idcount = new AtomicInteger();
|
||||
ids.forEach(id -> {
|
||||
logger.info(id);
|
||||
idcount.incrementAndGet();
|
||||
});
|
||||
assertEquals(numactions, idcount.get());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,56 +30,44 @@ class SmokeTest {
|
|||
|
||||
@Test
|
||||
void smokeTest() throws Exception {
|
||||
final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.build();
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.build();
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
try {
|
||||
assertEquals(helper.getClusterName(), client.getClusterName());
|
||||
client.newIndex("test_smoke");
|
||||
client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
client.flush();
|
||||
client.waitForResponses(30, TimeUnit.SECONDS);
|
||||
client.checkMapping("test_smoke");
|
||||
client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
|
||||
client.delete("test_smoke", "1");
|
||||
client.flush();
|
||||
client.waitForResponses(30, TimeUnit.SECONDS);
|
||||
client.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS);
|
||||
client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
client.delete("test_smoke", "1");
|
||||
client.flush();
|
||||
client.deleteIndex("test_smoke");
|
||||
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke", Settings.builder()
|
||||
.build());
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
assertEquals(0, indexDefinition.getReplicaLevel());
|
||||
client.newIndex(indexDefinition);
|
||||
client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
client.flush();
|
||||
client.waitForResponses(30, TimeUnit.SECONDS);
|
||||
client.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = client.getReplicaLevel(indexDefinition);
|
||||
assertEquals(helper.getClusterName(), adminClient.getClusterName());
|
||||
bulkClient.newIndex("test_smoke");
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
adminClient.checkMapping("test_smoke");
|
||||
bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
|
||||
bulkClient.delete("test_smoke", "1");
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
bulkClient.delete("test_smoke", "1");
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
adminClient.deleteIndex("test_smoke");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
client.deleteIndex(indexDefinition);
|
||||
//assertEquals(0, client.getBulkMetric().getFailed().getCount());
|
||||
//assertEquals(6, client.getBulkMetric().getSucceeded().getCount());
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
bulkClient.close();
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
// close admin after bulk
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
adminClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.xbib.elx.transport;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -10,48 +9,47 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.netty4.Netty4Plugin;
|
||||
import org.jboss.netty.channel.DefaultChannelFuture;
|
||||
import org.xbib.elx.common.util.NetworkUtils;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class TransportClientHelper {
|
||||
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportAdminClient.class.getName());
|
||||
|
||||
protected ElasticsearchClient createClient(Settings settings) throws IOException {
|
||||
protected ElasticsearchClient createClient(Settings settings) {
|
||||
if (settings != null) {
|
||||
String systemIdentifier = System.getProperty("os.name")
|
||||
+ " " + System.getProperty("java.vm.name")
|
||||
+ " " + System.getProperty("java.vm.vendor")
|
||||
+ " " + System.getProperty("java.vm.version")
|
||||
+ " Elasticsearch " + Version.CURRENT.toString();
|
||||
Settings transportClientSettings = getTransportClientSettings(settings);
|
||||
//XContentBuilder settingsBuilder = XContentFactory.jsonBuilder().startObject();
|
||||
XContentBuilder effectiveSettingsBuilder = XContentFactory.jsonBuilder().startObject();
|
||||
logger.log(Level.INFO, "creating transport client on {} with settings {}",
|
||||
systemIdentifier,
|
||||
//Strings.toString(settings.toXContent(settingsBuilder, ToXContent.EMPTY_PARAMS).endObject()),
|
||||
Strings.toString(transportClientSettings.toXContent(effectiveSettingsBuilder,
|
||||
ToXContent.EMPTY_PARAMS).endObject()));
|
||||
return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class));
|
||||
Settings effectiveSettings = Settings.builder()
|
||||
// for thread pool size
|
||||
.put("processors",
|
||||
settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
|
||||
.put("client.transport.sniff", false) // do not sniff
|
||||
.put("client.transport.nodes_sampler_interval", "1m") // do not ping
|
||||
.put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect
|
||||
.put("client.transport.ignore_cluster_name", true) // connect to any cluster
|
||||
// custom settings may override defaults
|
||||
.put(settings)
|
||||
.build();
|
||||
logger.info("creating transport client on {} with custom settings {} and effective settings {}",
|
||||
systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap());
|
||||
|
||||
// we need to disable dead lock check because we may have mixed node/transport clients
|
||||
DefaultChannelFuture.setUseDeadLockChecker(false);
|
||||
return TransportClient.builder().settings(effectiveSettings).build();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -64,33 +62,17 @@ public class TransportClientHelper {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedTransportClient init(Settings settings) throws IOException {
|
||||
super.init(settings);
|
||||
// additional auto-connect
|
||||
try {
|
||||
Collection<TransportAddress> addrs = findAddresses(settings);
|
||||
if (!connect(addrs, settings.getAsBoolean("autodiscover", false))) {
|
||||
throw new NoNodeAvailableException("no cluster nodes available, check settings "
|
||||
+ settings.toString());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private Collection<TransportAddress> findAddresses(Settings settings) throws IOException {
|
||||
final int defaultPort = settings.getAsInt("port", 9300);
|
||||
Collection<TransportAddress> addresses = new ArrayList<>();
|
||||
for (String hostname : settings.getAsList("host")) {
|
||||
for (String hostname : settings.getAsArray("host")) {
|
||||
String[] splitHost = hostname.split(":", 2);
|
||||
if (splitHost.length == 2) {
|
||||
try {
|
||||
String host = splitHost[0];
|
||||
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
|
||||
int port = Integer.parseInt(splitHost[1]);
|
||||
TransportAddress address = new TransportAddress(inetAddress, port);
|
||||
TransportAddress address = new InetSocketTransportAddress(inetAddress, port);
|
||||
addresses.add(address);
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
|
@ -99,18 +81,14 @@ public class TransportClientHelper {
|
|||
if (splitHost.length == 1) {
|
||||
String host = splitHost[0];
|
||||
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
|
||||
TransportAddress address = new TransportAddress(inetAddress, defaultPort);
|
||||
TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort);
|
||||
addresses.add(address);
|
||||
}
|
||||
}
|
||||
return addresses;
|
||||
}
|
||||
|
||||
private boolean connect(Collection<TransportAddress> addresses, boolean autodiscover) {
|
||||
if (getClient() == null) {
|
||||
throw new IllegalStateException("no client present");
|
||||
}
|
||||
TransportClient transportClient = (TransportClient) getClient();
|
||||
private boolean connect(TransportClient transportClient, Collection<TransportAddress> addresses, boolean autodiscover) {
|
||||
for (TransportAddress address : addresses) {
|
||||
transportClient.addTransportAddresses(address);
|
||||
}
|
||||
|
@ -131,31 +109,9 @@ public class TransportClientHelper {
|
|||
return false;
|
||||
}
|
||||
|
||||
private Settings getTransportClientSettings(Settings settings) {
|
||||
return Settings.builder()
|
||||
// "cluster.name"
|
||||
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(),
|
||||
settings.get(ClusterName.CLUSTER_NAME_SETTING.getKey()))
|
||||
// "processors"
|
||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(),
|
||||
settings.get(EsExecutors.PROCESSORS_SETTING.getKey(),
|
||||
String.valueOf(Runtime.getRuntime().availableProcessors())))
|
||||
// "transport.type"
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY,
|
||||
Netty4Plugin.NETTY_TRANSPORT_NAME)
|
||||
.build();
|
||||
}
|
||||
|
||||
private void addDiscoveryNodes(TransportClient transportClient, DiscoveryNodes discoveryNodes) {
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
transportClient.addTransportAddress(discoveryNode.getAddress());
|
||||
}
|
||||
}
|
||||
|
||||
static class MyTransportClient extends TransportClient {
|
||||
|
||||
MyTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
|
||||
super(settings, plugins);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportAdminClient;
|
||||
import org.xbib.elx.transport.TransportAdminClientProvider;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||
|
||||
|
@ -66,7 +68,6 @@ class BulkClientTest {
|
|||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
|
||||
.build();
|
||||
bulkClient.newIndex("test");
|
||||
bulkClient.close();
|
||||
|
@ -74,23 +75,27 @@ class BulkClientTest {
|
|||
|
||||
@Test
|
||||
void testMapping() throws Exception {
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(TransportAdminClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.startObject("doc")
|
||||
.startObject("properties")
|
||||
.startObject("location")
|
||||
.field("type", "geo_point")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
||||
assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties"));
|
||||
bulkClient.close();
|
||||
TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.startObject("doc")
|
||||
.startObject("properties")
|
||||
.startObject("location")
|
||||
.field("type", "geo_point")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
||||
assertTrue(adminClient.getMapping("test", "doc").containsKey("properties"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -2,21 +2,13 @@ package org.xbib.elx.transport.test;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -26,9 +18,9 @@ class DuplicateIDTest {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName());
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long ACTIONS = 5L;
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
|
@ -39,39 +31,23 @@ class DuplicateIDTest {
|
|||
@Test
|
||||
void testDuplicateDocIDs() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
try {
|
||||
bulkClient.newIndex("test_dup");
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test_dup", helper.randomString(1), false,
|
||||
bulkClient.index("test", helper.randomString(1), false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
client.refreshIndex("test_dup");
|
||||
SearchSourceBuilder builder = new SearchSourceBuilder();
|
||||
builder.query(QueryBuilders.matchAllQuery());
|
||||
builder.size(0);
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices("test_dup");
|
||||
searchRequest.types("test_dup");
|
||||
searchRequest.source(builder);
|
||||
SearchResponse searchResponse =
|
||||
helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet();
|
||||
long hits = searchResponse.getHits().getTotalHits();
|
||||
logger.info("hits = {}", hits);
|
||||
assertTrue(hits < ACTIONS);
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -35,15 +35,14 @@ class IndexPruneTest {
|
|||
|
||||
@Test
|
||||
void testPrune() throws IOException {
|
||||
final TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(TransportAdminClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
try {
|
||||
TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
|
@ -64,20 +63,13 @@ class IndexPruneTest {
|
|||
assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4"));
|
||||
List<Boolean> list = new ArrayList<>();
|
||||
for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) {
|
||||
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
|
||||
indicesExistsRequest.indices(index);
|
||||
IndicesExistsResponse indicesExistsResponse =
|
||||
client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
|
||||
list.add(indicesExistsResponse.isExists());
|
||||
list.add(adminClient.isIndexExists(index));
|
||||
}
|
||||
logger.info(list);
|
||||
assertFalse(list.get(0));
|
||||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
} finally {
|
||||
adminClient.close();
|
||||
bulkClient.close();
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.xbib.elx.transport.test;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.cluster.metadata.AliasAction;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -32,55 +33,54 @@ class IndexShiftTest {
|
|||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test
|
||||
void testIndexShift() throws Exception {
|
||||
final TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(TransportAdminClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
try {
|
||||
TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
client.newIndex("test_shift1234", settings);
|
||||
bulkClient.newIndex("test_shift", settings);
|
||||
for (int i = 0; i < 1; i++) {
|
||||
client.index("test_shift1234", helper.randomString(1), false,
|
||||
bulkClient.index("test_shift", helper.randomString(1), false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
IndexShiftResult indexShiftResult =
|
||||
client.shiftIndex("test_shift", "test_shift1234", Arrays.asList("a", "b", "c"));
|
||||
adminClient.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("a"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("b"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("c"));
|
||||
assertTrue(indexShiftResult.getMovedAliases().isEmpty());
|
||||
Map<String, String> aliases = client.getAliases("test_shift1234");
|
||||
Map<String, String> aliases = adminClient.getAliases("test_shift");
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
assertTrue(aliases.containsKey("c"));
|
||||
assertTrue(aliases.containsKey("test_shift"));
|
||||
String resolved = adminClient.resolveAlias("test_shift");
|
||||
assertTrue(aliases.containsKey("test"));
|
||||
String resolved = adminClient.resolveAlias("test");
|
||||
aliases = adminClient.getAliases(resolved);
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
assertTrue(aliases.containsKey("c"));
|
||||
assertTrue(aliases.containsKey("test_shift"));
|
||||
client.newIndex("test_shift5678", settings);
|
||||
assertTrue(aliases.containsKey("test"));
|
||||
bulkClient.newIndex("test_shift2", settings);
|
||||
for (int i = 0; i < 1; i++) {
|
||||
client.index("test_shift5678", helper.randomString(1), false,
|
||||
bulkClient.index("test_shift2", helper.randomString(1), false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
client.flush();
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
indexShiftResult = client.shiftIndex("test_shift", "test_shift5678", Arrays.asList("d", "e", "f"),
|
||||
(request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add()
|
||||
.index(index).alias(alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
indexShiftResult = adminClient.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"),
|
||||
(request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
index, alias).filter(QueryBuilders.termQuery("my_key", alias)))
|
||||
);
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("d"));
|
||||
assertTrue(indexShiftResult.getNewAliases().contains("e"));
|
||||
|
@ -88,14 +88,14 @@ class IndexShiftTest {
|
|||
assertTrue(indexShiftResult.getMovedAliases().contains("a"));
|
||||
assertTrue(indexShiftResult.getMovedAliases().contains("b"));
|
||||
assertTrue(indexShiftResult.getMovedAliases().contains("c"));
|
||||
aliases = client.getAliases("test_shift5678");
|
||||
aliases = adminClient.getAliases("test_shift2");
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
assertTrue(aliases.containsKey("c"));
|
||||
assertTrue(aliases.containsKey("d"));
|
||||
assertTrue(aliases.containsKey("e"));
|
||||
assertTrue(aliases.containsKey("f"));
|
||||
resolved = adminClient.resolveAlias("test_shift");
|
||||
resolved = adminClient.resolveAlias("test");
|
||||
aliases = adminClient.getAliases(resolved);
|
||||
assertTrue(aliases.containsKey("a"));
|
||||
assertTrue(aliases.containsKey("b"));
|
||||
|
@ -103,9 +103,6 @@ class IndexShiftTest {
|
|||
assertTrue(aliases.containsKey("d"));
|
||||
assertTrue(aliases.containsKey("e"));
|
||||
assertTrue(aliases.containsKey("f"));
|
||||
} finally {
|
||||
adminClient.close();
|
||||
bulkClient.close();
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
package org.xbib.elx.transport.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||
import org.xbib.elx.transport.TransportSearchClient;
|
||||
import org.xbib.elx.transport.TransportSearchClientProvider;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
class SearchTest {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
||||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDocStream() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try (bulkClient) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
}
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
try (TransportSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMinutes(1), 10);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
final AtomicInteger idcount = new AtomicInteger();
|
||||
ids.forEach(id -> {
|
||||
logger.info(id);
|
||||
idcount.incrementAndGet();
|
||||
});
|
||||
assertEquals(numactions, idcount.get());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -28,57 +28,49 @@ class SmokeTest {
|
|||
this.helper = helper;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void smokeTest() throws Exception {
|
||||
final TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(TransportAdminClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
try {
|
||||
assertEquals(helper.getClusterName(), client.getClusterName());
|
||||
client.newIndex("test_smoke");
|
||||
client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
client.flush();
|
||||
client.waitForResponses(30, TimeUnit.SECONDS);
|
||||
client.checkMapping("test_smoke");
|
||||
client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
|
||||
client.delete("test_smoke", "1");
|
||||
client.flush();
|
||||
client.waitForResponses(30, TimeUnit.SECONDS);
|
||||
client.checkMapping("test_smoke");
|
||||
client.deleteIndex("test_smoke");
|
||||
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke", Settings.builder()
|
||||
.build());
|
||||
TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
assertEquals(0, indexDefinition.getReplicaLevel());
|
||||
client.newIndex(indexDefinition);
|
||||
client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
client.flush();
|
||||
client.waitForResponses(30, TimeUnit.SECONDS);
|
||||
client.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = client.getReplicaLevel(indexDefinition);
|
||||
assertEquals(helper.getClusterName(), adminClient.getClusterName());
|
||||
bulkClient.newIndex("test_smoke");
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
adminClient.checkMapping("test_smoke");
|
||||
bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}");
|
||||
bulkClient.delete("test_smoke", "1");
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
bulkClient.delete("test_smoke", "1");
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
adminClient.deleteIndex("test_smoke");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
|
||||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
client.deleteIndex(indexDefinition);
|
||||
assertEquals(0, client.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
bulkClient.close();
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(4, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
// close admin after bulk
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
adminClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue