fix smoke, search, mock tests, move getMapping() to admin client

This commit is contained in:
Jörg Prante 2020-05-22 11:25:35 +02:00
parent 4a2cb095df
commit b580187063
24 changed files with 497 additions and 185 deletions

View file

@ -22,6 +22,10 @@ public interface AdminClient extends NativeClient {
*/
IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException;
Map<String, ?> getMapping(String index);
Map<String, ?> getMapping(String index, String type);
/**
* Delete an index.
* @param indexDefinition the index definition
@ -89,17 +93,6 @@ public interface AdminClient extends NativeClient {
*/
boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit);
/**
* Wait for index recovery (after replica change).
*
* @param index index
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return true if wait succeeded, false if wait timed out
*/
boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit);
/**
* Resolve alias.
*

View file

@ -55,7 +55,7 @@ public interface NativeClient extends Closeable {
*/
void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit);
Map<String, ?> getMapping(String index, String mapping);
void waitForShards(long maxWaitTime, TimeUnit timeUnit);
long getSearchableDocs(String index);

View file

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
@ -127,6 +128,21 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
}
};
@Override
public Map<String, ?> getMapping(String index) {
return getMapping(index, TYPE_NAME);
}
@Override
public Map<String, ?> getMapping(String index, String mapping) {
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
.setIndices(index)
.setTypes(mapping);
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap());
return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap();
}
@Override
public AdminClient deleteIndex(IndexDefinition indexDefinition) {
return deleteIndex(indexDefinition.getFullIndexName());
@ -134,56 +150,19 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
@Override
public AdminClient deleteIndex(String index) {
ensureClientIsPresent();
if (index == null) {
logger.warn("no index name given to delete index");
return this;
}
ensureClientIsPresent();
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest()
.indices(index);
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.waitForNoInitializingShards(true)
.waitForNoRelocatingShards(true)
.waitForYellowStatus();
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) {
String message = "timeout waiting for cluster shards";
logger.error(message);
throw new IllegalStateException(message);
}
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
waitForShards(30L, TimeUnit.SECONDS);
return this;
}
@Override
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent();
ensureIndexGiven(index);
GetSettingsRequest settingsRequest = new GetSettingsRequest();
settingsRequest.indices(index);
GetSettingsResponse settingsResponse = client.execute(GetSettingsAction.INSTANCE, settingsRequest).actionGet();
int shards = settingsResponse.getIndexToSettings()
.get(index).getAsInt("index.number_of_shards", -1);
if (shards > 0) {
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.indices(index)
.waitForActiveShards(shards)
.waitForNoInitializingShards(true)
.waitForNoRelocatingShards(true)
.waitForYellowStatus()
.timeout(timeout);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) {
String message = "timeout waiting for cluster shards";
logger.error(message);
}
}
return true;
}
@Override
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
return updateReplicaLevel(indexDefinition.getFullIndexName(), level,
@ -192,11 +171,12 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
@Override
public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException {
waitForCluster("YELLOW", maxWaitTime, timeUnit); // let cluster settle down from critical operations
if (level > 0) {
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
waitForRecovery(index, maxWaitTime, timeUnit);
if (level < 1) {
logger.warn("invalid replica level");
return this;
}
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
waitForShards(maxWaitTime, timeUnit);
return this;
}
@ -556,12 +536,6 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
}
}
private void ensureIndexGiven(String index) {
if (index == null) {
throw new IllegalArgumentException("no index given");
}
}
private Map<String, String> getFilters(GetAliasesResponse getAliasesResponse) {
Map<String, String> result = new HashMap<>();
for (ObjectObjectCursor<String, List<AliasMetaData>> object : getAliasesResponse.getAliases()) {

View file

@ -42,14 +42,15 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
@Override
public void init(Settings settings) throws IOException {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
super.init(settings);
if (bulkMetric == null) {
bulkMetric = new DefaultBulkMetric();
logger.log(Level.INFO, "initializing bulk metric with settings = " + settings.toDelimitedString(','));
bulkMetric.init(settings);
}
if (bulkController == null) {
bulkController = new DefaultBulkController(this, bulkMetric);
logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(','));
bulkController.init(settings);
}
}
@ -120,7 +121,6 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
return;
}
ensureClientIsPresent();
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE);
createIndexRequestBuilder.setIndex(index);
if (settings != null) {
@ -130,10 +130,11 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
// NOTE: addMapping(type, ...) API is very fragile. Use XConteBuilder for safe typing.
createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
}
createIndexRequestBuilder.setWaitForActiveShards(1);
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
logger.info("index {} created: {}", index,
Strings.toString(createIndexResponse.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)));
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
waitForShards(30L, TimeUnit.SECONDS);
}
@Override
@ -219,7 +220,9 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
@Override
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
ensureClientIsPresent();
return bulkController.waitForResponses(timeout, timeUnit);
boolean success = bulkController.waitForResponses(timeout, timeUnit);
logger.info("waited for all bulk responses: " + success);
return success;
}
@Override

View file

@ -13,9 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.search.SearchAction;
@ -28,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.xbib.elx.api.NativeClient;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -64,11 +60,10 @@ public abstract class AbstractNativeClient implements NativeClient {
protected abstract void closeClient() throws IOException;
@Override
public void init(Settings settings) throws IOException {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
if (client == null) {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
client = createClient(settings);
}
}
@ -96,15 +91,35 @@ public abstract class AbstractNativeClient implements NativeClient {
@Override
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent();
logger.info("waiting for cluster status " + statusString);
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) {
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.timeout(timeout)
.waitForStatus(status);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) {
String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name();
if (logger.isErrorEnabled()) {
logger.error(message);
}
logger.error(message);
throw new IllegalStateException(message);
}
}
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent();
logger.info("waiting for cluster shard settling");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.waitForNoInitializingShards(true)
.waitForNoRelocatingShards(true)
.timeout(timeout);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) {
String message = "timeout waiting for cluster shards";
logger.error(message);
throw new IllegalStateException(message);
}
}
@ -130,16 +145,6 @@ public abstract class AbstractNativeClient implements NativeClient {
}
}
@Override
public Map<String, ?> getMapping(String index, String mapping) {
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
.setIndices(index)
.setTypes(mapping);
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap());
return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap();
}
@Override
public long getSearchableDocs(String index) {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
@ -158,7 +163,6 @@ public abstract class AbstractNativeClient implements NativeClient {
return indicesExistsResponse.isExists();
}
@Override
public void close() throws IOException {
ensureClientIsPresent();
@ -186,9 +190,6 @@ public abstract class AbstractNativeClient implements NativeClient {
}
protected void ensureClientIsPresent() {
if (this instanceof MockAdminClient) {
return;
}
if (client == null) {
throw new IllegalStateException("no client");
}

View file

@ -47,7 +47,7 @@ public class DefaultBulkController implements BulkController {
private BulkListener bulkListener;
private AtomicBoolean active;
private final AtomicBoolean active;
private boolean enableBulkLogging;

View file

@ -2,13 +2,22 @@ package org.xbib.elx.common;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* A mocked client, it does not perform any actions on a cluster. Useful for testing.
*/
public class MockAdminClient extends AbstractAdminClient {
public class MockAdminClient extends MockNativeClient implements AdminClient {
@Override
public void setClient(ElasticsearchClient client) {
}
@Override
public ElasticsearchClient getClient() {
@ -20,16 +29,42 @@ public class MockAdminClient extends AbstractAdminClient {
}
@Override
protected ElasticsearchClient createClient(Settings settings) {
public String getClusterName() {
return null;
}
@Override
protected void closeClient() {
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
return null;
}
@Override
public MockAdminClient deleteIndex(String index) {
public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException {
return null;
}
@Override
public Map<String, ?> getMapping(String index) {
return null;
}
@Override
public Map<String, ?> getMapping(String index, String type) {
return null;
}
@Override
public AdminClient deleteIndex(IndexDefinition indexDefinition) {
return this;
}
@Override
public AdminClient deleteIndex(String index) {
return this;
}
@Override
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
return this;
}
@ -38,20 +73,95 @@ public class MockAdminClient extends AbstractAdminClient {
return true;
}
@Override
public String resolveAlias(String alias) {
return null;
}
@Override
public String resolveMostRecentIndex(String alias) {
return null;
}
@Override
public Map<String, String> getAliases(String index) {
return null;
}
@Override
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases) {
return null;
}
@Override
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases, IndexAliasAdder indexAliasAdder) {
return null;
}
@Override
public IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases) {
return null;
}
@Override
public IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases, IndexAliasAdder adder) {
return null;
}
@Override
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
return null;
}
@Override
public IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform) {
return null;
}
@Override
public Long mostRecentDocument(String index, String timestampfieldname) throws IOException {
return null;
}
@Override
public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) {
}
@Override
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
return true;
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override
public MockAdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) {
public long getSearchableDocs(String index) {
return 0;
}
@Override
public boolean isIndexExists(String index) {
return false;
}
@Override
public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) {
return this;
}
@Override
public int getReplicaLevel(IndexDefinition indexDefinition) {
return 0;
}
@Override
public int getReplicaLevel(String index) {
return 0;
}
@Override
public boolean forceMerge(IndexDefinition indexDefinition) {
return false;
}
@Override
public void close() {
// nothing to do

View file

@ -4,13 +4,22 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xbib.elx.api.BulkClient;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.IndexDefinition;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* A mocked client, it does not perform any actions on a cluster. Useful for testing.
*/
public class MockBulkClient extends AbstractBulkClient {
public class MockBulkClient extends MockNativeClient implements BulkClient {
@Override
public ElasticsearchClient getClient() {
@ -35,6 +44,46 @@ public class MockBulkClient extends AbstractBulkClient {
protected void closeClient() {
}
@Override
public BulkMetric getBulkMetric() {
return null;
}
@Override
public BulkController getBulkController() {
return null;
}
@Override
public void newIndex(String index) throws IOException {
}
@Override
public void newIndex(IndexDefinition indexDefinition) throws IOException {
}
@Override
public void newIndex(String index, Settings settings) throws IOException {
}
@Override
public void newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException {
}
@Override
public void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
}
@Override
public BulkClient index(String index, String id, boolean create, BytesReference source) {
return null;
}
@Override
public MockBulkClient index(String index, String id, boolean create, String source) {
return this;
@ -60,15 +109,30 @@ public class MockBulkClient extends AbstractBulkClient {
return this;
}
@Override
public BulkClient update(String index, String id, BytesReference source) {
return null;
}
@Override
public MockBulkClient update(UpdateRequest updateRequest) {
return this;
}
@Override
public void startBulk(IndexDefinition indexDefinition) throws IOException {
}
@Override
public void startBulk(String index, long startRefreshInterval, long stopRefreshIterval) {
}
@Override
public void stopBulk(IndexDefinition indexDefinition) throws IOException {
}
@Override
public void stopBulk(String index, long maxWaitTime, TimeUnit timeUnit) {
}
@ -78,6 +142,11 @@ public class MockBulkClient extends AbstractBulkClient {
return true;
}
@Override
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
}
@Override
public void refreshIndex(String index) {
}

View file

@ -0,0 +1,74 @@
package org.xbib.elx.common;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.NativeClient;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MockNativeClient extends AbstractNativeClient implements NativeClient {
@Override
protected void ensureClientIsPresent() {
}
@Override
public void setClient(ElasticsearchClient client) {
}
@Override
public ElasticsearchClient getClient() {
return null;
}
@Override
protected ElasticsearchClient createClient(Settings settings) throws IOException {
return null;
}
@Override
protected void closeClient() throws IOException {
}
@Override
public void init(Settings settings) throws IOException {
}
@Override
public String getClusterName() {
return null;
}
@Override
public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) {
return null;
}
@Override
public void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit) {
}
@Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
}
@Override
public long getSearchableDocs(String index) {
return 0;
}
@Override
public boolean isIndexExists(String index) {
return false;
}
@Override
public void close() throws IOException {
}
}

View file

@ -1,12 +1,25 @@
package org.xbib.elx.common;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.xbib.elx.api.SearchClient;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
* A mocked client, it does not perform any actions on a cluster. Useful for testing.
*/
public class MockSearchClient extends AbstractSearchClient {
public class MockSearchClient extends MockNativeClient implements SearchClient {
@Override
public ElasticsearchClient getClient() {
@ -35,4 +48,29 @@ public class MockSearchClient extends AbstractSearchClient {
public void close() {
// nothing to do
}
@Override
public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder) {
return Optional.empty();
}
@Override
public Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilder) {
return Optional.empty();
}
@Override
public Optional<SearchResponse> search(Consumer<SearchRequestBuilder> searchRequestBuilder) {
return Optional.empty();
}
@Override
public Stream<SearchHit> search(Consumer<SearchRequestBuilder> searchRequestBuilder, TimeValue scrollTime, int scrollSize) {
return null;
}
@Override
public Stream<String> getIds(Consumer<SearchRequestBuilder> queryBuilder) {
return null;
}
}

View file

@ -0,0 +1 @@
org.xbib.elx.common.MockBulkClientProvider

View file

@ -0,0 +1 @@
org.xbib.elx.common.MockSearchClientProvider

View file

@ -1,21 +0,0 @@
package org.xbib.elx.common.test;
import org.junit.jupiter.api.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.MockAdminClient;
import org.xbib.elx.common.MockAdminClientProvider;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class MockAdminClientProviderTest {
@Test
void testMockAdminProvider() throws IOException {
MockAdminClient client = ClientBuilder.builder()
.setAdminClientProvider(MockAdminClientProvider.class)
.build();
assertNotNull(client);
}
}

View file

@ -0,0 +1,35 @@
package org.xbib.elx.common.test;
import org.junit.jupiter.api.Test;
import org.xbib.elx.common.*;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class MockClientProviderTest {
@Test
void testMockAdminClientProvider() throws IOException {
MockAdminClient client = ClientBuilder.builder()
.setAdminClientProvider(MockAdminClientProvider.class)
.build();
assertNotNull(client);
}
@Test
void testMockBulkClientProvider() throws IOException {
MockBulkClient client = ClientBuilder.builder()
.setBulkClientProvider(MockBulkClientProvider.class)
.build();
assertNotNull(client);
}
@Test
void testMockSearchClientProvider() throws IOException {
MockSearchClient client = ClientBuilder.builder()
.setSearchClientProvider(MockSearchClientProvider.class)
.build();
assertNotNull(client);
}
}

View file

@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.http.HttpAdminClient;
import org.xbib.elx.http.HttpAdminClientProvider;
import org.xbib.elx.http.HttpBulkClient;
import org.xbib.elx.http.HttpBulkClientProvider;
@ -73,24 +75,27 @@ class BulkClientTest {
@Test
void testMapping() throws Exception {
final HttpBulkClient client = ClientBuilder.builder()
try (HttpAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(HttpAdminClientProvider.class)
.put(helper.getHttpSettings())
.build();
HttpBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(HttpBulkClientProvider.class)
.put(helper.getHttpSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
client.newIndex("test", Settings.EMPTY, builder);
assertTrue(client.getMapping("test", "doc").containsKey("properties"));
client.close();
.build()) {
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
bulkClient.newIndex("test", Settings.EMPTY, builder);
assertTrue(adminClient.getMapping("test", "doc").containsKey("properties"));
}
}
@Test

View file

@ -16,6 +16,7 @@ import org.xbib.elx.http.HttpBulkClientProvider;
import org.xbib.elx.http.HttpSearchClient;
import org.xbib.elx.http.HttpSearchClientProvider;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
@ExtendWith(TestExtension.class)
@ -70,7 +71,12 @@ class SearchTest {
Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()));
ids.forEach(logger::info);
final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> {
logger.info(id);
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get());
}
}
}

View file

@ -40,6 +40,7 @@ class SmokeTest {
.build()) {
IndexDefinition indexDefinition =
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
assertEquals(0, indexDefinition.getReplicaLevel());
assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex("test_smoke");
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
@ -50,12 +51,10 @@ class SmokeTest {
bulkClient.delete("test_smoke", "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS);
bulkClient.delete("test_smoke", "1");
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete("test_smoke", "1");
bulkClient.flush();
adminClient.deleteIndex("test_smoke");
assertEquals(0, indexDefinition.getReplicaLevel());
bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.flush();
@ -63,11 +62,11 @@ class SmokeTest {
adminClient.updateReplicaLevel(indexDefinition, 2);
int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
assertNull(bulkClient.getBulkController().getLastBulkError());
adminClient.deleteIndex(indexDefinition);
}

View file

@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.node.NodeAdminClient;
import org.xbib.elx.node.NodeAdminClientProvider;
import org.xbib.elx.node.NodeBulkClient;
import org.xbib.elx.node.NodeBulkClientProvider;
@ -71,22 +73,25 @@ class BulkClientTest {
@Test
void testMapping() throws Exception {
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
.setAdminClientProvider(NodeAdminClientProvider.class)
.build();
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
bulkClient.newIndex("test", Settings.EMPTY, builder);
assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties"));
bulkClient.close();
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
.setBulkClientProvider(NodeBulkClientProvider.class)
.build()) {
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
bulkClient.newIndex("test", Settings.EMPTY, builder);
assertTrue(adminClient.getMapping("test", "doc").containsKey("properties"));
}
}
@Test

View file

@ -16,6 +16,7 @@ import org.xbib.elx.node.NodeBulkClientProvider;
import org.xbib.elx.node.NodeSearchClient;
import org.xbib.elx.node.NodeSearchClientProvider;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
@ExtendWith(TestExtension.class)
@ -68,7 +69,12 @@ class SearchTest {
Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()));
ids.forEach(logger::info);
final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> {
logger.info(id);
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get());
}
}
}

View file

@ -38,6 +38,7 @@ class SmokeTest {
.build()) {
IndexDefinition indexDefinition =
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
assertEquals(0, indexDefinition.getReplicaLevel());
assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex("test_smoke");
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
@ -48,12 +49,10 @@ class SmokeTest {
bulkClient.delete("test_smoke", "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS);
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete("test_smoke", "1");
bulkClient.flush();
adminClient.deleteIndex("test_smoke");
assertEquals(0, indexDefinition.getReplicaLevel());
bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.flush();
@ -61,11 +60,11 @@ class SmokeTest {
adminClient.updateReplicaLevel(indexDefinition, 2);
int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
assertNull(bulkClient.getBulkController().getLastBulkError());
adminClient.deleteIndex(indexDefinition);
}

View file

@ -11,6 +11,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import org.xbib.elx.transport.TransportAdminClient;
import org.xbib.elx.transport.TransportAdminClientProvider;
import org.xbib.elx.transport.TransportBulkClient;
import org.xbib.elx.transport.TransportBulkClientProvider;
@ -74,23 +76,27 @@ class BulkClientTest {
@Test
void testMapping() throws Exception {
final TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class)
try (TransportAdminClient adminClient = ClientBuilder.builder()
.setAdminClientProvider(TransportAdminClientProvider.class)
.put(helper.getTransportSettings())
.build();
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
bulkClient.newIndex("test", Settings.EMPTY, builder);
assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties"));
bulkClient.close();
TransportBulkClient bulkClient = ClientBuilder.builder()
.setBulkClientProvider(TransportBulkClientProvider.class)
.put(helper.getTransportSettings())
.build()) {
XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
bulkClient.newIndex("test", Settings.EMPTY, builder);
assertTrue(adminClient.getMapping("test", "doc").containsKey("properties"));
}
}
@Test

View file

@ -16,6 +16,7 @@ import org.xbib.elx.transport.TransportBulkClientProvider;
import org.xbib.elx.transport.TransportSearchClient;
import org.xbib.elx.transport.TransportSearchClientProvider;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
@ExtendWith(TestExtension.class)
@ -70,7 +71,12 @@ class SearchTest {
Stream<String> ids = searchClient.getIds(qb -> qb
.setIndices("test")
.setQuery(QueryBuilders.matchAllQuery()));
ids.forEach(logger::info);
final AtomicInteger idcount = new AtomicInteger();
ids.forEach(id -> {
logger.info(id);
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get());
}
}
}

View file

@ -40,6 +40,7 @@ class SmokeTest {
.build()) {
IndexDefinition indexDefinition =
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
assertEquals(0, indexDefinition.getReplicaLevel());
assertEquals(helper.getClusterName(), adminClient.getClusterName());
bulkClient.newIndex("test_smoke");
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
@ -50,9 +51,10 @@ class SmokeTest {
bulkClient.delete("test_smoke", "1");
bulkClient.flush();
bulkClient.waitForResponses(30, TimeUnit.SECONDS);
adminClient.checkMapping("test_smoke");
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.delete("test_smoke", "1");
bulkClient.flush();
adminClient.deleteIndex("test_smoke");
assertEquals(0, indexDefinition.getReplicaLevel());
bulkClient.newIndex(indexDefinition);
bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
bulkClient.flush();
@ -61,7 +63,7 @@ class SmokeTest {
int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(4, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}

View file

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 6.3.2.6
version = 6.3.2.7
gradle.wrapper.version = 6.4.1
elasticsearch-server.version = 6.3.2.4