diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index a8b15bc..e1eaebb 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -139,8 +139,20 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement logger.warn("no index name given to delete index"); return this; } - DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest() + .indices(index); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() + .waitForNoInitializingShards(true) + .waitForNoRelocatingShards(true) + .waitForYellowStatus(); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); + if (healthResponse.isTimedOut()) { + String message = "timeout waiting for cluster shards"; + logger.error(message); + throw new IllegalStateException(message); + } return this; } @@ -151,18 +163,22 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement GetSettingsRequest settingsRequest = new GetSettingsRequest(); settingsRequest.indices(index); GetSettingsResponse settingsResponse = client.execute(GetSettingsAction.INSTANCE, settingsRequest).actionGet(); - int shards = settingsResponse.getIndexToSettings().get(index).getAsInt("index.number_of_shards", -1); + int shards = settingsResponse.getIndexToSettings() + .get(index).getAsInt("index.number_of_shards", -1); if (shards > 0) { TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() .indices(index) .waitForActiveShards(shards) + .waitForNoInitializingShards(true) + .waitForNoRelocatingShards(true) + .waitForYellowStatus() .timeout(timeout); ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - logger.error("timeout waiting for recovery"); - return false; + if (healthResponse.isTimedOut()) { + String message = "timeout waiting for cluster shards"; + logger.error(message); } } return true; diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index e4c603d..ca6ae53 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -4,7 +4,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushAction; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -27,7 +27,6 @@ import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.IndexDefinition; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -100,41 +99,41 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public void newIndex(String index) throws IOException { - newIndex(index, Settings.EMPTY, (Map) null); + newIndex(index, Settings.EMPTY, (XContentBuilder) null); } @Override public void newIndex(String index, Settings settings) throws IOException { - newIndex(index, settings, (Map) null); + newIndex(index, settings, (XContentBuilder) null); + } + + @Override + public void newIndex(String index, Settings settings, Map map) throws IOException { + newIndex(index, settings, map == null || map.isEmpty() ? null : + JsonXContent.contentBuilder().map(map)); } @Override public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException { - String mappingString = Strings.toString(builder); - Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingString).mapOrdered(); - newIndex(index, settings, mappings); - } - - @Override - public void newIndex(String index, Settings settings, Map mapping) throws IOException { if (index == null) { logger.warn("no index name given to create index"); return; } ensureClientIsPresent(); waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); - CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index); + CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE); + createIndexRequestBuilder.setIndex(index); if (settings != null) { - createIndexRequest.settings(settings); + createIndexRequestBuilder.setSettings(settings); } - if (mapping != null) { - createIndexRequest.mapping(TYPE_NAME, mapping); + if (builder != null) { + // NOTE: addMapping(type, ...) API is very fragile. Use XConteBuilder for safe typing. + createIndexRequestBuilder.addMapping(TYPE_NAME, builder); } - CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); - XContentBuilder builder = XContentFactory.jsonBuilder(); + createIndexRequestBuilder.setWaitForActiveShards(1); + CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); logger.info("index {} created: {}", index, - Strings.toString(createIndexResponse.toXContent(builder, ToXContent.EMPTY_PARAMS))); + Strings.toString(createIndexResponse.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java index 5c8c599..1a197ce 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java @@ -38,7 +38,7 @@ public abstract class AbstractNativeClient implements NativeClient { /** * The one and only index type name used in the extended client. - * Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_". + * NOTE: all Elasticsearch version less than 6.2.0 forbid a prepending "_". */ protected static final String TYPE_NAME = "doc"; diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index 5502e9e..9f9c162 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -23,7 +23,7 @@ public class ClientBuilder { private final ElasticsearchClient client; - private final ClassLoader classLoader; + private ClassLoader classLoader; private final Settings.Builder settingsBuilder; @@ -56,6 +56,11 @@ public class ClientBuilder { return new ClientBuilder(client); } + public ClientBuilder setClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + public ClientBuilder setAdminClientProvider(Class adminClientProvider) { this.adminClientProvider = adminClientProvider; return this; @@ -140,6 +145,6 @@ public class ClientBuilder { } } } - throw new IllegalArgumentException("no provider"); + throw new IllegalArgumentException("no provider found"); } } diff --git a/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java b/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java index 0c76157..ee97253 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java +++ b/elx-http/src/main/java/org/xbib/elx/http/HttpClientHelper.java @@ -136,7 +136,7 @@ public class HttpClientHelper extends AbstractAdminClient implements Elasticsear @Override public ThreadPool threadPool() { - logger.log(Level.DEBUG, "returning null for threadPool() request"); + logger.log(Level.TRACE, "returning null for threadPool() request"); return null; } @@ -149,7 +149,7 @@ public class HttpClientHelper extends AbstractAdminClient implements Elasticsear } try { HttpActionContext httpActionContext = new HttpActionContext(this, request, url); - logger.log(Level.DEBUG, "url = " + url); + logger.log(Level.TRACE, "url = " + url); httpAction.execute(httpActionContext, listener); } catch (Exception e) { logger.error(e.getMessage(), e); diff --git a/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpClearScrollAction.java b/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpClearScrollAction.java new file mode 100644 index 0000000..b6235ae --- /dev/null +++ b/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpClearScrollAction.java @@ -0,0 +1,39 @@ +package org.xbib.elx.http.action.search; + +import org.elasticsearch.action.search.ClearScrollAction; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.xbib.elx.http.HttpAction; +import org.xbib.netty.http.client.api.Request; +import java.io.IOException; + +public class HttpClearScrollAction extends HttpAction { + + @Override + public ClearScrollAction getActionInstance() { + return ClearScrollAction.INSTANCE; + } + + @Override + protected Request.Builder createHttpRequest(String baseUrl, ClearScrollRequest request) throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder(); + request.toXContent(builder, ToXContent.EMPTY_PARAMS); + return newDeleteRequest(baseUrl, "_search/scroll", BytesReference.bytes(builder)); + } + + @Override + protected CheckedFunction entityParser() { + return ClearScrollResponse::fromXContent; + } + + @Override + protected ClearScrollResponse emptyResponse() { + return new ClearScrollResponse(true, 0); + } +} diff --git a/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpSearchAction.java b/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpSearchAction.java index a2e84fe..2f3b017 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpSearchAction.java +++ b/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpSearchAction.java @@ -5,6 +5,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.Scroll; import org.xbib.elx.http.HttpAction; import org.xbib.netty.http.client.api.Request; @@ -19,9 +20,10 @@ public class HttpSearchAction extends HttpAction @Override protected Request.Builder createHttpRequest(String url, SearchRequest request) { - // request.indices() always empty array + Scroll scroll = request.scroll(); + String params = scroll != null ? "?scroll=" + scroll.keepAlive() : ""; String index = request.indices() != null ? String.join(",", request.indices()) + "/" : ""; - return newPostRequest(url, index + "_search", request.source().toString()); + return newPostRequest(url, index + "_search" + params, request.source().toString()); } @Override diff --git a/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpSearchScrollAction.java b/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpSearchScrollAction.java new file mode 100644 index 0000000..cb2bda0 --- /dev/null +++ b/elx-http/src/main/java/org/xbib/elx/http/action/search/HttpSearchScrollAction.java @@ -0,0 +1,39 @@ +package org.xbib.elx.http.action.search; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollAction; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.xbib.elx.http.HttpAction; +import org.xbib.netty.http.client.api.Request; +import java.io.IOException; + +public class HttpSearchScrollAction extends HttpAction { + + @Override + public SearchScrollAction getActionInstance() { + return SearchScrollAction.INSTANCE; + } + + @Override + protected Request.Builder createHttpRequest(String baseUrl, SearchScrollRequest request) throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder(); + request.toXContent(builder, ToXContent.EMPTY_PARAMS); + return newPostRequest(baseUrl, "_search/scroll", BytesReference.bytes(builder)); + } + + @Override + protected SearchResponse emptyResponse() { + return new SearchResponse(); + } + + @Override + protected CheckedFunction entityParser() { + return SearchResponse::fromXContent; + } +} diff --git a/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction b/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction index 3e9602c..8d15006 100644 --- a/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction +++ b/elx-http/src/main/resources/META-INF/services/org.xbib.elx.http.HttpAction @@ -15,6 +15,8 @@ org.xbib.elx.http.action.admin.indices.settings.put.HttpUpdateSettingsAction org.xbib.elx.http.action.bulk.HttpBulkAction org.xbib.elx.http.action.index.HttpIndexAction org.xbib.elx.http.action.search.HttpSearchAction +org.xbib.elx.http.action.search.HttpClearScrollAction +org.xbib.elx.http.action.search.HttpSearchScrollAction org.xbib.elx.http.action.main.HttpMainAction org.xbib.elx.http.action.get.HttpExistsAction org.xbib.elx.http.action.get.HttpGetAction diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java new file mode 100644 index 0000000..b904768 --- /dev/null +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java @@ -0,0 +1,76 @@ +package org.xbib.elx.http.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.Parameters; +import org.xbib.elx.http.HttpBulkClient; +import org.xbib.elx.http.HttpBulkClientProvider; +import org.xbib.elx.http.HttpSearchClient; +import org.xbib.elx.http.HttpSearchClientProvider; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@ExtendWith(TestExtension.class) +class SearchTest { + + private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); + + private static final Long ACTIONS = 100L; + + private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + + private final TestExtension.Helper helper; + + SearchTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testDocStream() throws Exception { + long numactions = ACTIONS; + final HttpBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(HttpBulkClientProvider.class) + .put(helper.getHttpSettings()) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .build(); + try (bulkClient) { + bulkClient.newIndex("test"); + for (int i = 0; i < ACTIONS; i++) { + bulkClient.index("test", null, false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertEquals(numactions, bulkClient.getSearchableDocs("test")); + } + assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); + try (HttpSearchClient searchClient = ClientBuilder.builder() + .setSearchClientProvider(HttpSearchClientProvider.class) + .put(helper.getHttpSettings()) + .build()) { + Stream stream = searchClient.search(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery()), + TimeValue.timeValueMinutes(1), 10); + long count = stream.count(); + assertEquals(numactions, count); + Stream ids = searchClient.getIds(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery())); + ids.forEach(logger::info); + } + } +} diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java index d2663f5..f511e71 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java @@ -30,17 +30,16 @@ class SmokeTest { @Test void smokeTest() throws Exception { - final HttpAdminClient adminClient = ClientBuilder.builder() + try (HttpAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(HttpAdminClientProvider.class) .put(helper.getHttpSettings()) .build(); - final HttpBulkClient bulkClient = ClientBuilder.builder() + HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .build(); - IndexDefinition indexDefinition = adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.builder() - .build()); - try { + .build()) { + IndexDefinition indexDefinition = + adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex("test_smoke"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -64,17 +63,13 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - } finally { - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); assertNull(bulkClient.getBulkController().getLastBulkError()); - // close admin after bulk adminClient.deleteIndex(indexDefinition); - adminClient.close(); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java new file mode 100644 index 0000000..ea458f8 --- /dev/null +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -0,0 +1,74 @@ +package org.xbib.elx.node.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.Parameters; +import org.xbib.elx.node.NodeBulkClient; +import org.xbib.elx.node.NodeBulkClientProvider; +import org.xbib.elx.node.NodeSearchClient; +import org.xbib.elx.node.NodeSearchClientProvider; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@ExtendWith(TestExtension.class) +class SearchTest { + + private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); + + private static final Long ACTIONS = 100L; + + private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + + private final TestExtension.Helper helper; + + SearchTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testDocStream() throws Exception { + long numactions = ACTIONS; + final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + .setBulkClientProvider(NodeBulkClientProvider.class) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .build(); + try (bulkClient) { + bulkClient.newIndex("test"); + for (int i = 0; i < ACTIONS; i++) { + bulkClient.index("test", null, false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertEquals(numactions, bulkClient.getSearchableDocs("test")); + } + assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); + try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1")) + .setSearchClientProvider(NodeSearchClientProvider.class) + .build()) { + Stream stream = searchClient.search(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery()), + TimeValue.timeValueMinutes(1), 10); + long count = stream.count(); + assertEquals(numactions, count); + Stream ids = searchClient.getIds(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery())); + ids.forEach(logger::info); + } + } +} diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index c802cd0..aadf1b9 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -30,15 +30,14 @@ class SmokeTest { @Test void smokeTest() throws Exception { - final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) .setAdminClientProvider(NodeAdminClientProvider.class) .build(); - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) .setBulkClientProvider(NodeBulkClientProvider.class) - .build(); - IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); - try { + .build()) { + IndexDefinition indexDefinition = + adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex("test_smoke"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -62,17 +61,13 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - } finally { - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); assertNull(bulkClient.getBulkController().getLastBulkError()); - // close admin after bulk adminClient.deleteIndex(indexDefinition); - adminClient.close(); } } } diff --git a/elx-node/src/test/resources/log4j2-test.xml b/elx-node/src/test/resources/log4j2-test.xml index 6c323f8..11bffcf 100644 --- a/elx-node/src/test/resources/log4j2-test.xml +++ b/elx-node/src/test/resources/log4j2-test.xml @@ -10,4 +10,4 @@ - \ No newline at end of file + diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java new file mode 100644 index 0000000..ffc634b --- /dev/null +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -0,0 +1,76 @@ +package org.xbib.elx.transport.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.Parameters; +import org.xbib.elx.transport.TransportBulkClient; +import org.xbib.elx.transport.TransportBulkClientProvider; +import org.xbib.elx.transport.TransportSearchClient; +import org.xbib.elx.transport.TransportSearchClientProvider; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@ExtendWith(TestExtension.class) +class SearchTest { + + private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); + + private static final Long ACTIONS = 100L; + + private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + + private final TestExtension.Helper helper; + + SearchTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testDocStream() throws Exception { + long numactions = ACTIONS; + final TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .build(); + try (bulkClient) { + bulkClient.newIndex("test"); + for (int i = 0; i < ACTIONS; i++) { + bulkClient.index("test", null, false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertEquals(numactions, bulkClient.getSearchableDocs("test")); + } + assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); + try (TransportSearchClient searchClient = ClientBuilder.builder() + .setSearchClientProvider(TransportSearchClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { + Stream stream = searchClient.search(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery()), + TimeValue.timeValueMinutes(1), 10); + long count = stream.count(); + assertEquals(numactions, count); + Stream ids = searchClient.getIds(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery())); + ids.forEach(logger::info); + } + } +} diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index b005f12..64bf39e 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -30,17 +30,16 @@ class SmokeTest { @Test void smokeTest() throws Exception { - final TransportAdminClient adminClient = ClientBuilder.builder() + try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - final TransportBulkClient bulkClient = ClientBuilder.builder() + TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .build(); - IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); - try { + .build()) { + IndexDefinition indexDefinition = + adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex("test_smoke"); bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest @@ -61,17 +60,13 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - } finally { - bulkClient.close(); assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); assertEquals(4, bulkClient.getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - // close admin after bulk adminClient.deleteIndex(indexDefinition); - adminClient.close(); } } } diff --git a/elx-transport/src/test/resources/log4j2.xml b/elx-transport/src/test/resources/log4j2-test.xml similarity index 100% rename from elx-transport/src/test/resources/log4j2.xml rename to elx-transport/src/test/resources/log4j2-test.xml