diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index bdb5b5d..c5357f0 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -22,6 +22,10 @@ public interface AdminClient extends NativeClient { */ IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException; + Map getMapping(String index); + + Map getMapping(String index, String type); + /** * Delete an index. * @param indexDefinition the index definition @@ -89,17 +93,6 @@ public interface AdminClient extends NativeClient { */ boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit); - - /** - * Wait for index recovery (after replica change). - * - * @param index index - * @param maxWaitTime maximum wait time - * @param timeUnit time unit - * @return true if wait succeeded, false if wait timed out - */ - boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit); - /** * Resolve alias. * diff --git a/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java b/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java index 52fbbf3..243be58 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java @@ -55,7 +55,7 @@ public interface NativeClient extends Closeable { */ void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); - Map getMapping(String index, String mapping); + void waitForShards(long maxWaitTime, TimeUnit timeUnit); long getSearchableDocs(String index); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index e1eaebb..bd46cee 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; @@ -127,6 +128,21 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement } }; + @Override + public Map getMapping(String index) { + return getMapping(index, TYPE_NAME); + } + + @Override + public Map getMapping(String index, String mapping) { + GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) + .setIndices(index) + .setTypes(mapping); + GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); + logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap()); + return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap(); + } + @Override public AdminClient deleteIndex(IndexDefinition indexDefinition) { return deleteIndex(indexDefinition.getFullIndexName()); @@ -134,56 +150,19 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement @Override public AdminClient deleteIndex(String index) { - ensureClientIsPresent(); if (index == null) { logger.warn("no index name given to delete index"); return this; } + ensureClientIsPresent(); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest() .indices(index); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); - ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() - .waitForNoInitializingShards(true) - .waitForNoRelocatingShards(true) - .waitForYellowStatus(); - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse.isTimedOut()) { - String message = "timeout waiting for cluster shards"; - logger.error(message); - throw new IllegalStateException(message); - } + waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); + waitForShards(30L, TimeUnit.SECONDS); return this; } - @Override - public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) { - ensureClientIsPresent(); - ensureIndexGiven(index); - GetSettingsRequest settingsRequest = new GetSettingsRequest(); - settingsRequest.indices(index); - GetSettingsResponse settingsResponse = client.execute(GetSettingsAction.INSTANCE, settingsRequest).actionGet(); - int shards = settingsResponse.getIndexToSettings() - .get(index).getAsInt("index.number_of_shards", -1); - if (shards > 0) { - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() - .indices(index) - .waitForActiveShards(shards) - .waitForNoInitializingShards(true) - .waitForNoRelocatingShards(true) - .waitForYellowStatus() - .timeout(timeout); - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse.isTimedOut()) { - String message = "timeout waiting for cluster shards"; - logger.error(message); - } - } - return true; - } - @Override public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { return updateReplicaLevel(indexDefinition.getFullIndexName(), level, @@ -192,11 +171,12 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement @Override public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException { - waitForCluster("YELLOW", maxWaitTime, timeUnit); // let cluster settle down from critical operations - if (level > 0) { - updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); - waitForRecovery(index, maxWaitTime, timeUnit); + if (level < 1) { + logger.warn("invalid replica level"); + return this; } + updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); + waitForShards(maxWaitTime, timeUnit); return this; } @@ -556,12 +536,6 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement } } - private void ensureIndexGiven(String index) { - if (index == null) { - throw new IllegalArgumentException("no index given"); - } - } - private Map getFilters(GetAliasesResponse getAliasesResponse) { Map result = new HashMap<>(); for (ObjectObjectCursor> object : getAliasesResponse.getAliases()) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index ca6ae53..d2ce66d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -42,14 +42,15 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public void init(Settings settings) throws IOException { - logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); super.init(settings); if (bulkMetric == null) { bulkMetric = new DefaultBulkMetric(); + logger.log(Level.INFO, "initializing bulk metric with settings = " + settings.toDelimitedString(',')); bulkMetric.init(settings); } if (bulkController == null) { bulkController = new DefaultBulkController(this, bulkMetric); + logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(',')); bulkController.init(settings); } } @@ -120,7 +121,6 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements return; } ensureClientIsPresent(); - waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE); createIndexRequestBuilder.setIndex(index); if (settings != null) { @@ -130,10 +130,11 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements // NOTE: addMapping(type, ...) API is very fragile. Use XConteBuilder for safe typing. createIndexRequestBuilder.addMapping(TYPE_NAME, builder); } - createIndexRequestBuilder.setWaitForActiveShards(1); CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); logger.info("index {} created: {}", index, Strings.toString(createIndexResponse.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))); + waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); + waitForShards(30L, TimeUnit.SECONDS); } @Override @@ -219,7 +220,9 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public boolean waitForResponses(long timeout, TimeUnit timeUnit) { ensureClientIsPresent(); - return bulkController.waitForResponses(timeout, timeUnit); + boolean success = bulkController.waitForResponses(timeout, timeUnit); + logger.info("waited for all bulk responses: " + success); + return success; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java index 1a197ce..d4fa393 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java @@ -13,9 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.search.SearchAction; @@ -28,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.xbib.elx.api.NativeClient; import java.io.IOException; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,11 +60,10 @@ public abstract class AbstractNativeClient implements NativeClient { protected abstract void closeClient() throws IOException; - @Override public void init(Settings settings) throws IOException { - logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); if (client == null) { + logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); client = createClient(settings); } } @@ -96,15 +91,35 @@ public abstract class AbstractNativeClient implements NativeClient { @Override public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); + logger.info("waiting for cluster status " + statusString); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() + .timeout(timeout) + .waitForStatus(status); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); + if (healthResponse.isTimedOut()) { String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name(); - if (logger.isErrorEnabled()) { - logger.error(message); - } + logger.error(message); + throw new IllegalStateException(message); + } + } + + @Override + public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { + ensureClientIsPresent(); + logger.info("waiting for cluster shard settling"); + TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() + .waitForNoInitializingShards(true) + .waitForNoRelocatingShards(true) + .timeout(timeout); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); + if (healthResponse.isTimedOut()) { + String message = "timeout waiting for cluster shards"; + logger.error(message); throw new IllegalStateException(message); } } @@ -130,16 +145,6 @@ public abstract class AbstractNativeClient implements NativeClient { } } - @Override - public Map getMapping(String index, String mapping) { - GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) - .setIndices(index) - .setTypes(mapping); - GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); - logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap()); - return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap(); - } - @Override public long getSearchableDocs(String index) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) @@ -158,7 +163,6 @@ public abstract class AbstractNativeClient implements NativeClient { return indicesExistsResponse.isExists(); } - @Override public void close() throws IOException { ensureClientIsPresent(); @@ -186,9 +190,6 @@ public abstract class AbstractNativeClient implements NativeClient { } protected void ensureClientIsPresent() { - if (this instanceof MockAdminClient) { - return; - } if (client == null) { throw new IllegalStateException("no client"); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index 557befc..8cf955a 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -47,7 +47,7 @@ public class DefaultBulkController implements BulkController { private BulkListener bulkListener; - private AtomicBoolean active; + private final AtomicBoolean active; private boolean enableBulkLogging; diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index a63c989..ece099f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -2,13 +2,22 @@ package org.xbib.elx.common; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; +import org.xbib.elx.api.*; +import java.io.IOException; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; /** * A mocked client, it does not perform any actions on a cluster. Useful for testing. */ -public class MockAdminClient extends AbstractAdminClient { +public class MockAdminClient extends MockNativeClient implements AdminClient { + + @Override + public void setClient(ElasticsearchClient client) { + + } @Override public ElasticsearchClient getClient() { @@ -20,16 +29,42 @@ public class MockAdminClient extends AbstractAdminClient { } @Override - protected ElasticsearchClient createClient(Settings settings) { + public String getClusterName() { return null; } @Override - protected void closeClient() { + public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { + return null; } @Override - public MockAdminClient deleteIndex(String index) { + public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException { + return null; + } + + @Override + public Map getMapping(String index) { + return null; + } + + @Override + public Map getMapping(String index, String type) { + return null; + } + + @Override + public AdminClient deleteIndex(IndexDefinition indexDefinition) { + return this; + } + + @Override + public AdminClient deleteIndex(String index) { + return this; + } + + @Override + public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { return this; } @@ -38,20 +73,95 @@ public class MockAdminClient extends AbstractAdminClient { return true; } + @Override + public String resolveAlias(String alias) { + return null; + } + + @Override + public String resolveMostRecentIndex(String alias) { + return null; + } + + @Override + public Map getAliases(String index) { + return null; + } + + @Override + public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases) { + return null; + } + + @Override + public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases, IndexAliasAdder indexAliasAdder) { + return null; + } + + @Override + public IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases) { + return null; + } + + @Override + public IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases, IndexAliasAdder adder) { + return null; + } + + @Override + public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { + return null; + } + + @Override + public IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform) { + return null; + } + + @Override + public Long mostRecentDocument(String index, String timestampfieldname) throws IOException { + return null; + } + @Override public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) { } @Override - public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) { - return true; + public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { + } @Override - public MockAdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) { + public long getSearchableDocs(String index) { + return 0; + } + + @Override + public boolean isIndexExists(String index) { + return false; + } + + @Override + public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) { return this; } + @Override + public int getReplicaLevel(IndexDefinition indexDefinition) { + return 0; + } + + @Override + public int getReplicaLevel(String index) { + return 0; + } + + @Override + public boolean forceMerge(IndexDefinition indexDefinition) { + return false; + } + @Override public void close() { // nothing to do diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index 7c78f83..178c74f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -4,13 +4,22 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.xbib.elx.api.BulkClient; +import org.xbib.elx.api.BulkController; +import org.xbib.elx.api.BulkMetric; +import org.xbib.elx.api.IndexDefinition; + +import java.io.IOException; +import java.util.Map; import java.util.concurrent.TimeUnit; /** * A mocked client, it does not perform any actions on a cluster. Useful for testing. */ -public class MockBulkClient extends AbstractBulkClient { +public class MockBulkClient extends MockNativeClient implements BulkClient { @Override public ElasticsearchClient getClient() { @@ -35,6 +44,46 @@ public class MockBulkClient extends AbstractBulkClient { protected void closeClient() { } + @Override + public BulkMetric getBulkMetric() { + return null; + } + + @Override + public BulkController getBulkController() { + return null; + } + + @Override + public void newIndex(String index) throws IOException { + + } + + @Override + public void newIndex(IndexDefinition indexDefinition) throws IOException { + + } + + @Override + public void newIndex(String index, Settings settings) throws IOException { + + } + + @Override + public void newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException { + + } + + @Override + public void newIndex(String index, Settings settings, Map mapping) throws IOException { + + } + + @Override + public BulkClient index(String index, String id, boolean create, BytesReference source) { + return null; + } + @Override public MockBulkClient index(String index, String id, boolean create, String source) { return this; @@ -60,15 +109,30 @@ public class MockBulkClient extends AbstractBulkClient { return this; } + @Override + public BulkClient update(String index, String id, BytesReference source) { + return null; + } + @Override public MockBulkClient update(UpdateRequest updateRequest) { return this; } + @Override + public void startBulk(IndexDefinition indexDefinition) throws IOException { + + } + @Override public void startBulk(String index, long startRefreshInterval, long stopRefreshIterval) { } + @Override + public void stopBulk(IndexDefinition indexDefinition) throws IOException { + + } + @Override public void stopBulk(String index, long maxWaitTime, TimeUnit timeUnit) { } @@ -78,6 +142,11 @@ public class MockBulkClient extends AbstractBulkClient { return true; } + @Override + public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { + + } + @Override public void refreshIndex(String index) { } diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockNativeClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockNativeClient.java new file mode 100644 index 0000000..c15464c --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/MockNativeClient.java @@ -0,0 +1,74 @@ +package org.xbib.elx.common; + +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.settings.Settings; +import org.xbib.elx.api.NativeClient; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class MockNativeClient extends AbstractNativeClient implements NativeClient { + + @Override + protected void ensureClientIsPresent() { + } + + @Override + public void setClient(ElasticsearchClient client) { + } + + @Override + public ElasticsearchClient getClient() { + return null; + } + + @Override + protected ElasticsearchClient createClient(Settings settings) throws IOException { + return null; + } + + @Override + protected void closeClient() throws IOException { + + } + + @Override + public void init(Settings settings) throws IOException { + + } + + @Override + public String getClusterName() { + return null; + } + + @Override + public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { + return null; + } + + @Override + public void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit) { + + } + + @Override + public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { + + } + + @Override + public long getSearchableDocs(String index) { + return 0; + } + + @Override + public boolean isIndexExists(String index) { + return false; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java index 07c0091..257e4db 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java @@ -1,12 +1,25 @@ package org.xbib.elx.common; +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetRequestBuilder; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; +import org.xbib.elx.api.SearchClient; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Stream; /** * A mocked client, it does not perform any actions on a cluster. Useful for testing. */ -public class MockSearchClient extends AbstractSearchClient { +public class MockSearchClient extends MockNativeClient implements SearchClient { @Override public ElasticsearchClient getClient() { @@ -35,4 +48,29 @@ public class MockSearchClient extends AbstractSearchClient { public void close() { // nothing to do } + + @Override + public Optional get(Consumer getRequestBuilder) { + return Optional.empty(); + } + + @Override + public Optional multiGet(Consumer multiGetRequestBuilder) { + return Optional.empty(); + } + + @Override + public Optional search(Consumer searchRequestBuilder) { + return Optional.empty(); + } + + @Override + public Stream search(Consumer searchRequestBuilder, TimeValue scrollTime, int scrollSize) { + return null; + } + + @Override + public Stream getIds(Consumer queryBuilder) { + return null; + } } diff --git a/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.BulkClientProvider b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.BulkClientProvider new file mode 100644 index 0000000..23d86e7 --- /dev/null +++ b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.BulkClientProvider @@ -0,0 +1 @@ +org.xbib.elx.common.MockBulkClientProvider \ No newline at end of file diff --git a/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.SearchClientProvider b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.SearchClientProvider new file mode 100644 index 0000000..ec8036b --- /dev/null +++ b/elx-common/src/main/resources/META-INF/services/org.xbib.elx.api.SearchClientProvider @@ -0,0 +1 @@ +org.xbib.elx.common.MockSearchClientProvider \ No newline at end of file diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/MockAdminClientProviderTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/MockAdminClientProviderTest.java deleted file mode 100644 index c7123e8..0000000 --- a/elx-common/src/test/java/org/xbib/elx/common/test/MockAdminClientProviderTest.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.xbib.elx.common.test; - -import org.junit.jupiter.api.Test; -import org.xbib.elx.common.ClientBuilder; -import org.xbib.elx.common.MockAdminClient; -import org.xbib.elx.common.MockAdminClientProvider; - -import java.io.IOException; - -import static org.junit.jupiter.api.Assertions.assertNotNull; - -class MockAdminClientProviderTest { - - @Test - void testMockAdminProvider() throws IOException { - MockAdminClient client = ClientBuilder.builder() - .setAdminClientProvider(MockAdminClientProvider.class) - .build(); - assertNotNull(client); - } -} diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/MockClientProviderTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/MockClientProviderTest.java new file mode 100644 index 0000000..77e42d1 --- /dev/null +++ b/elx-common/src/test/java/org/xbib/elx/common/test/MockClientProviderTest.java @@ -0,0 +1,35 @@ +package org.xbib.elx.common.test; + +import org.junit.jupiter.api.Test; +import org.xbib.elx.common.*; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class MockClientProviderTest { + + @Test + void testMockAdminClientProvider() throws IOException { + MockAdminClient client = ClientBuilder.builder() + .setAdminClientProvider(MockAdminClientProvider.class) + .build(); + assertNotNull(client); + } + + @Test + void testMockBulkClientProvider() throws IOException { + MockBulkClient client = ClientBuilder.builder() + .setBulkClientProvider(MockBulkClientProvider.class) + .build(); + assertNotNull(client); + } + + @Test + void testMockSearchClientProvider() throws IOException { + MockSearchClient client = ClientBuilder.builder() + .setSearchClientProvider(MockSearchClientProvider.class) + .build(); + assertNotNull(client); + } +} diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java index c5e7b66..fdb28c6 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java @@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import org.xbib.elx.http.HttpAdminClient; +import org.xbib.elx.http.HttpAdminClientProvider; import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; @@ -73,24 +75,27 @@ class BulkClientTest { @Test void testMapping() throws Exception { - final HttpBulkClient client = ClientBuilder.builder() + try (HttpAdminClient adminClient = ClientBuilder.builder() + .setAdminClientProvider(HttpAdminClientProvider.class) + .put(helper.getHttpSettings()) + .build(); + HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) - .build(); - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("doc") - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject() - .endObject(); - client.newIndex("test", Settings.EMPTY, builder); - assertTrue(client.getMapping("test", "doc").containsKey("properties")); - client.close(); + .build()) { + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + bulkClient.newIndex("test", Settings.EMPTY, builder); + assertTrue(adminClient.getMapping("test", "doc").containsKey("properties")); + } } @Test diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java index b904768..1b126e9 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java @@ -16,6 +16,7 @@ import org.xbib.elx.http.HttpBulkClientProvider; import org.xbib.elx.http.HttpSearchClient; import org.xbib.elx.http.HttpSearchClientProvider; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @ExtendWith(TestExtension.class) @@ -70,7 +71,12 @@ class SearchTest { Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); - ids.forEach(logger::info); + final AtomicInteger idcount = new AtomicInteger(); + ids.forEach(id -> { + logger.info(id); + idcount.incrementAndGet(); + }); + assertEquals(numactions, idcount.get()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java index f511e71..d255c0a 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java @@ -40,6 +40,7 @@ class SmokeTest { .build()) { IndexDefinition indexDefinition = adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex("test_smoke"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -50,12 +51,10 @@ class SmokeTest { bulkClient.delete("test_smoke", "1"); bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS); - bulkClient.delete("test_smoke", "1"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete("test_smoke", "1"); bulkClient.flush(); adminClient.deleteIndex("test_smoke"); - assertEquals(0, indexDefinition.getReplicaLevel()); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.flush(); @@ -63,11 +62,11 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); + assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); assertNull(bulkClient.getBulkController().getLastBulkError()); adminClient.deleteIndex(indexDefinition); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 16b2c69..2c3af66 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import org.xbib.elx.node.NodeAdminClient; +import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -71,22 +73,25 @@ class BulkClientTest { @Test void testMapping() throws Exception { - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) - .setBulkClientProvider(NodeBulkClientProvider.class) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + .setAdminClientProvider(NodeAdminClientProvider.class) .build(); - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("doc") - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject() - .endObject(); - bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties")); - bulkClient.close(); + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + .setBulkClientProvider(NodeBulkClientProvider.class) + .build()) { + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + bulkClient.newIndex("test", Settings.EMPTY, builder); + assertTrue(adminClient.getMapping("test", "doc").containsKey("properties")); + } } @Test diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index ea458f8..2fa2f0b 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -16,6 +16,7 @@ import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeSearchClient; import org.xbib.elx.node.NodeSearchClientProvider; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @ExtendWith(TestExtension.class) @@ -68,7 +69,12 @@ class SearchTest { Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); - ids.forEach(logger::info); + final AtomicInteger idcount = new AtomicInteger(); + ids.forEach(id -> { + logger.info(id); + idcount.incrementAndGet(); + }); + assertEquals(numactions, idcount.get()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index aadf1b9..af53da5 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -38,6 +38,7 @@ class SmokeTest { .build()) { IndexDefinition indexDefinition = adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex("test_smoke"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -48,12 +49,10 @@ class SmokeTest { bulkClient.delete("test_smoke", "1"); bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.delete("test_smoke", "1"); bulkClient.flush(); adminClient.deleteIndex("test_smoke"); - assertEquals(0, indexDefinition.getReplicaLevel()); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.flush(); @@ -61,11 +60,11 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); + assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); assertNull(bulkClient.getBulkController().getLastBulkError()); adminClient.deleteIndex(indexDefinition); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 388f6f3..1eee23c 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -11,6 +11,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import org.xbib.elx.transport.TransportAdminClient; +import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; @@ -74,23 +76,27 @@ class BulkClientTest { @Test void testMapping() throws Exception { - final TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) + try (TransportAdminClient adminClient = ClientBuilder.builder() + .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("doc") - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject() - .endObject(); - bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties")); - bulkClient.close(); + TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + bulkClient.newIndex("test", Settings.EMPTY, builder); + assertTrue(adminClient.getMapping("test", "doc").containsKey("properties")); + } } @Test diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index ffc634b..c923a57 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -16,6 +16,7 @@ import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportSearchClient; import org.xbib.elx.transport.TransportSearchClientProvider; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @ExtendWith(TestExtension.class) @@ -70,7 +71,12 @@ class SearchTest { Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); - ids.forEach(logger::info); + final AtomicInteger idcount = new AtomicInteger(); + ids.forEach(id -> { + logger.info(id); + idcount.incrementAndGet(); + }); + assertEquals(numactions, idcount.get()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 64bf39e..509d8ad 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -40,6 +40,7 @@ class SmokeTest { .build()) { IndexDefinition indexDefinition = adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex("test_smoke"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -50,9 +51,10 @@ class SmokeTest { bulkClient.delete("test_smoke", "1"); bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.checkMapping("test_smoke"); + bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete("test_smoke", "1"); + bulkClient.flush(); adminClient.deleteIndex("test_smoke"); - assertEquals(0, indexDefinition.getReplicaLevel()); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.flush(); @@ -61,7 +63,7 @@ class SmokeTest { int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(4, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/gradle.properties b/gradle.properties index 30af3b4..e3f9a52 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 6.3.2.6 +version = 6.3.2.7 gradle.wrapper.version = 6.4.1 elasticsearch-server.version = 6.3.2.4