fix HTTP scroll search, fix wait after index creation/deletion, new search test
This commit is contained in:
parent
f2c50d9759
commit
4a2cb095df
17 changed files with 375 additions and 62 deletions
|
@ -139,8 +139,20 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
logger.warn("no index name given to delete index");
|
||||
return this;
|
||||
}
|
||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index);
|
||||
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest()
|
||||
.indices(index);
|
||||
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet();
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
.waitForNoInitializingShards(true)
|
||||
.waitForNoRelocatingShards(true)
|
||||
.waitForYellowStatus();
|
||||
ClusterHealthResponse healthResponse =
|
||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||
if (healthResponse.isTimedOut()) {
|
||||
String message = "timeout waiting for cluster shards";
|
||||
logger.error(message);
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -151,18 +163,22 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement
|
|||
GetSettingsRequest settingsRequest = new GetSettingsRequest();
|
||||
settingsRequest.indices(index);
|
||||
GetSettingsResponse settingsResponse = client.execute(GetSettingsAction.INSTANCE, settingsRequest).actionGet();
|
||||
int shards = settingsResponse.getIndexToSettings().get(index).getAsInt("index.number_of_shards", -1);
|
||||
int shards = settingsResponse.getIndexToSettings()
|
||||
.get(index).getAsInt("index.number_of_shards", -1);
|
||||
if (shards > 0) {
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
.indices(index)
|
||||
.waitForActiveShards(shards)
|
||||
.waitForNoInitializingShards(true)
|
||||
.waitForNoRelocatingShards(true)
|
||||
.waitForYellowStatus()
|
||||
.timeout(timeout);
|
||||
ClusterHealthResponse healthResponse =
|
||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
logger.error("timeout waiting for recovery");
|
||||
return false;
|
||||
if (healthResponse.isTimedOut()) {
|
||||
String message = "timeout waiting for cluster shards";
|
||||
logger.error(message);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -4,7 +4,7 @@ import org.apache.logging.log4j.Level;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushAction;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
|
@ -27,7 +27,6 @@ import org.xbib.elx.api.BulkClient;
|
|||
import org.xbib.elx.api.BulkController;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
|
@ -100,41 +99,41 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements
|
|||
|
||||
@Override
|
||||
public void newIndex(String index) throws IOException {
|
||||
newIndex(index, Settings.EMPTY, (Map<String, ?>) null);
|
||||
newIndex(index, Settings.EMPTY, (XContentBuilder) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings) throws IOException {
|
||||
newIndex(index, settings, (Map<String, ?>) null);
|
||||
newIndex(index, settings, (XContentBuilder) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings, Map<String, ?> map) throws IOException {
|
||||
newIndex(index, settings, map == null || map.isEmpty() ? null :
|
||||
JsonXContent.contentBuilder().map(map));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
|
||||
String mappingString = Strings.toString(builder);
|
||||
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingString).mapOrdered();
|
||||
newIndex(index, settings, mappings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
|
||||
if (index == null) {
|
||||
logger.warn("no index name given to create index");
|
||||
return;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
|
||||
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE);
|
||||
createIndexRequestBuilder.setIndex(index);
|
||||
if (settings != null) {
|
||||
createIndexRequest.settings(settings);
|
||||
createIndexRequestBuilder.setSettings(settings);
|
||||
}
|
||||
if (mapping != null) {
|
||||
createIndexRequest.mapping(TYPE_NAME, mapping);
|
||||
if (builder != null) {
|
||||
// NOTE: addMapping(type, ...) API is very fragile. Use XConteBuilder for safe typing.
|
||||
createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
|
||||
}
|
||||
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
createIndexRequestBuilder.setWaitForActiveShards(1);
|
||||
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
|
||||
logger.info("index {} created: {}", index,
|
||||
Strings.toString(createIndexResponse.toXContent(builder, ToXContent.EMPTY_PARAMS)));
|
||||
Strings.toString(createIndexResponse.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,7 +38,7 @@ public abstract class AbstractNativeClient implements NativeClient {
|
|||
|
||||
/**
|
||||
* The one and only index type name used in the extended client.
|
||||
* Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_".
|
||||
* NOTE: all Elasticsearch version less than 6.2.0 forbid a prepending "_".
|
||||
*/
|
||||
protected static final String TYPE_NAME = "doc";
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ public class ClientBuilder {
|
|||
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final ClassLoader classLoader;
|
||||
private ClassLoader classLoader;
|
||||
|
||||
private final Settings.Builder settingsBuilder;
|
||||
|
||||
|
@ -56,6 +56,11 @@ public class ClientBuilder {
|
|||
return new ClientBuilder(client);
|
||||
}
|
||||
|
||||
public ClientBuilder setClassLoader(ClassLoader classLoader) {
|
||||
this.classLoader = classLoader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClientBuilder setAdminClientProvider(Class<? extends AdminClientProvider> adminClientProvider) {
|
||||
this.adminClientProvider = adminClientProvider;
|
||||
return this;
|
||||
|
@ -140,6 +145,6 @@ public class ClientBuilder {
|
|||
}
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("no provider");
|
||||
throw new IllegalArgumentException("no provider found");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -136,7 +136,7 @@ public class HttpClientHelper extends AbstractAdminClient implements Elasticsear
|
|||
|
||||
@Override
|
||||
public ThreadPool threadPool() {
|
||||
logger.log(Level.DEBUG, "returning null for threadPool() request");
|
||||
logger.log(Level.TRACE, "returning null for threadPool() request");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ public class HttpClientHelper extends AbstractAdminClient implements Elasticsear
|
|||
}
|
||||
try {
|
||||
HttpActionContext httpActionContext = new HttpActionContext(this, request, url);
|
||||
logger.log(Level.DEBUG, "url = " + url);
|
||||
logger.log(Level.TRACE, "url = " + url);
|
||||
httpAction.execute(httpActionContext, listener);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package org.xbib.elx.http.action.search;
|
||||
|
||||
import org.elasticsearch.action.search.ClearScrollAction;
|
||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.search.ClearScrollResponse;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.xbib.elx.http.HttpAction;
|
||||
import org.xbib.netty.http.client.api.Request;
|
||||
import java.io.IOException;
|
||||
|
||||
public class HttpClearScrollAction extends HttpAction<ClearScrollRequest, ClearScrollResponse> {
|
||||
|
||||
@Override
|
||||
public ClearScrollAction getActionInstance() {
|
||||
return ClearScrollAction.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request.Builder createHttpRequest(String baseUrl, ClearScrollRequest request) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
request.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
return newDeleteRequest(baseUrl, "_search/scroll", BytesReference.bytes(builder));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CheckedFunction<XContentParser, ClearScrollResponse, IOException> entityParser() {
|
||||
return ClearScrollResponse::fromXContent;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClearScrollResponse emptyResponse() {
|
||||
return new ClearScrollResponse(true, 0);
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@ import org.elasticsearch.action.search.SearchRequest;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.Scroll;
|
||||
import org.xbib.elx.http.HttpAction;
|
||||
import org.xbib.netty.http.client.api.Request;
|
||||
|
||||
|
@ -19,9 +20,10 @@ public class HttpSearchAction extends HttpAction<SearchRequest, SearchResponse>
|
|||
|
||||
@Override
|
||||
protected Request.Builder createHttpRequest(String url, SearchRequest request) {
|
||||
// request.indices() always empty array
|
||||
Scroll scroll = request.scroll();
|
||||
String params = scroll != null ? "?scroll=" + scroll.keepAlive() : "";
|
||||
String index = request.indices() != null ? String.join(",", request.indices()) + "/" : "";
|
||||
return newPostRequest(url, index + "_search", request.source().toString());
|
||||
return newPostRequest(url, index + "_search" + params, request.source().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package org.xbib.elx.http.action.search;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchScrollAction;
|
||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.xbib.elx.http.HttpAction;
|
||||
import org.xbib.netty.http.client.api.Request;
|
||||
import java.io.IOException;
|
||||
|
||||
public class HttpSearchScrollAction extends HttpAction<SearchScrollRequest, SearchResponse> {
|
||||
|
||||
@Override
|
||||
public SearchScrollAction getActionInstance() {
|
||||
return SearchScrollAction.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Request.Builder createHttpRequest(String baseUrl, SearchScrollRequest request) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
request.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
return newPostRequest(baseUrl, "_search/scroll", BytesReference.bytes(builder));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SearchResponse emptyResponse() {
|
||||
return new SearchResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CheckedFunction<XContentParser, SearchResponse, IOException> entityParser() {
|
||||
return SearchResponse::fromXContent;
|
||||
}
|
||||
}
|
|
@ -15,6 +15,8 @@ org.xbib.elx.http.action.admin.indices.settings.put.HttpUpdateSettingsAction
|
|||
org.xbib.elx.http.action.bulk.HttpBulkAction
|
||||
org.xbib.elx.http.action.index.HttpIndexAction
|
||||
org.xbib.elx.http.action.search.HttpSearchAction
|
||||
org.xbib.elx.http.action.search.HttpClearScrollAction
|
||||
org.xbib.elx.http.action.search.HttpSearchScrollAction
|
||||
org.xbib.elx.http.action.main.HttpMainAction
|
||||
org.xbib.elx.http.action.get.HttpExistsAction
|
||||
org.xbib.elx.http.action.get.HttpGetAction
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
package org.xbib.elx.http.test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.http.HttpBulkClient;
|
||||
import org.xbib.elx.http.HttpBulkClientProvider;
|
||||
import org.xbib.elx.http.HttpSearchClient;
|
||||
import org.xbib.elx.http.HttpSearchClientProvider;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
class SearchTest {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
||||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDocStream() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try (bulkClient) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
}
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
try (HttpSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(HttpSearchClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.build()) {
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMinutes(1), 10);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
ids.forEach(logger::info);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,17 +30,16 @@ class SmokeTest {
|
|||
|
||||
@Test
|
||||
void smokeTest() throws Exception {
|
||||
final HttpAdminClient adminClient = ClientBuilder.builder()
|
||||
try (HttpAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(HttpAdminClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.build();
|
||||
final HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.build();
|
||||
IndexDefinition indexDefinition = adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.builder()
|
||||
.build());
|
||||
try {
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
assertEquals(helper.getClusterName(), adminClient.getClusterName());
|
||||
bulkClient.newIndex("test_smoke");
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
|
@ -64,17 +63,13 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
} finally {
|
||||
bulkClient.close();
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
// close admin after bulk
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
adminClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
package org.xbib.elx.node.test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
import org.xbib.elx.node.NodeSearchClient;
|
||||
import org.xbib.elx.node.NodeSearchClientProvider;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
class SearchTest {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
||||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDocStream() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try (bulkClient) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
}
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setSearchClientProvider(NodeSearchClientProvider.class)
|
||||
.build()) {
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMinutes(1), 10);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
ids.forEach(logger::info);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,15 +30,14 @@ class SmokeTest {
|
|||
|
||||
@Test
|
||||
void smokeTest() throws Exception {
|
||||
final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setAdminClientProvider(NodeAdminClientProvider.class)
|
||||
.build();
|
||||
final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1"))
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.build();
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
try {
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
assertEquals(helper.getClusterName(), adminClient.getClusterName());
|
||||
bulkClient.newIndex("test_smoke");
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
|
@ -62,17 +61,13 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
} finally {
|
||||
bulkClient.close();
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
// close admin after bulk
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
adminClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,4 +10,4 @@
|
|||
<AppenderRef ref="Console" />
|
||||
</Root>
|
||||
</Loggers>
|
||||
</configuration>
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
package org.xbib.elx.transport.test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||
import org.xbib.elx.transport.TransportSearchClient;
|
||||
import org.xbib.elx.transport.TransportSearchClientProvider;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ExtendWith(TestExtension.class)
|
||||
class SearchTest {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SearchTest.class.getName());
|
||||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 10L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDocStream() throws Exception {
|
||||
long numactions = ACTIONS;
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build();
|
||||
try (bulkClient) {
|
||||
bulkClient.newIndex("test");
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
bulkClient.index("test", null, false,
|
||||
"{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex("test");
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||
}
|
||||
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
try (TransportSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()),
|
||||
TimeValue.timeValueMinutes(1), 10);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices("test")
|
||||
.setQuery(QueryBuilders.matchAllQuery()));
|
||||
ids.forEach(logger::info);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,17 +30,16 @@ class SmokeTest {
|
|||
|
||||
@Test
|
||||
void smokeTest() throws Exception {
|
||||
final TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(TransportAdminClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
try {
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition =
|
||||
adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY);
|
||||
assertEquals(helper.getClusterName(), adminClient.getClusterName());
|
||||
bulkClient.newIndex("test_smoke");
|
||||
bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
|
@ -61,17 +60,13 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
} finally {
|
||||
bulkClient.close();
|
||||
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
|
||||
assertEquals(4, bulkClient.getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
// close admin after bulk
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
adminClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue