From 6146da9554e8c0bf245fbc703da4b65a580e6518 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Fri, 29 Jan 2021 15:51:03 +0100 Subject: [PATCH] cleaning up --- .../java/org/xbib/elx/api/AdminClient.java | 2 - .../java/org/xbib/elx/api/BulkClient.java | 1 - .../java/org/xbib/elx/api/BulkListener.java | 2 +- .../java/org/xbib/elx/api/BulkProcessor.java | 4 +- .../org/xbib/elx/api/IndexDefinition.java | 6 ++ .../xbib/elx/common/AbstractAdminClient.java | 20 ++----- .../xbib/elx/common/AbstractBasicClient.java | 3 +- .../xbib/elx/common/AbstractBulkClient.java | 58 +++++++++++------- .../xbib/elx/common/AbstractSearchClient.java | 8 ++- .../elx/common/DefaultBulkController.java | 8 ++- .../xbib/elx/common/DefaultBulkProcessor.java | 59 ++++++------------- .../xbib/elx/http/test/BulkClientTest.java | 2 +- .../org/xbib/elx/http/test/SearchTest.java | 6 +- .../xbib/elx/node/test/BulkClientTest.java | 2 +- .../org/xbib/elx/node/test/SearchTest.java | 22 +++++-- .../elx/transport/test/BulkClientTest.java | 2 +- .../xbib/elx/transport/test/SearchTest.java | 22 +++++-- 17 files changed, 121 insertions(+), 106 deletions(-) diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index f6e58fb..059430f 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -24,8 +24,6 @@ public interface AdminClient extends BasicClient { Map getMapping(String index) throws IOException; - Map getMapping(String index, String type) throws IOException; - void checkMapping(String index); /** diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 3c24d82..b12f49a 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -165,7 +165,6 @@ public interface BulkClient extends BasicClient, Flushable { void startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException; - /** * Stop bulk mode. * diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java index 045204f..da80393 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java @@ -4,7 +4,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; /** - * A bulk listener for the execution. + * A bulk listener for following executions of bulk operations. */ public interface BulkListener { diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index d45170e..f6112bc 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -1,6 +1,6 @@ package org.xbib.elx.api; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import java.io.Closeable; import java.io.Flushable; @@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { - BulkProcessor add(ActionRequest request); + BulkProcessor add(DocWriteRequest request); boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 1c77ccb..02eda88 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -4,6 +4,12 @@ import java.util.concurrent.TimeUnit; public interface IndexDefinition { + /** + * The one and only index type name used in the extended client. + * Note that all Elasticsearch version < 6.2.0 do not allow a prepending "_". + */ + String TYPE_NAME = "_doc"; + IndexDefinition setIndex(String index); String getIndex(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index f888e47..03f9077 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -87,16 +87,12 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.xbib.elx.api.IndexDefinition.TYPE_NAME; + public abstract class AbstractAdminClient extends AbstractBasicClient implements AdminClient { private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); - /** - * The one and only index type name used in the extended client. - * Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_". - */ - private static final String TYPE_NAME = "doc"; - private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() { @Override public List getMovedAliases() { @@ -133,17 +129,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public Map getMapping(String index) { - return getMapping(index, TYPE_NAME); - } - - @Override - public Map getMapping(String index, String mapping) { GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) .setIndices(index) - .setTypes(mapping); + .setTypes(TYPE_NAME); GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); - logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap()); - return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap(); + return getMappingsResponse.getMappings().get(index).get(TYPE_NAME).getSourceAsMap(); } @Override @@ -377,7 +367,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); - logger.info("{} indices", getIndexResponse.getIndices().length); + logger.info("found {} indices for pruning", getIndexResponse.getIndices().length); List candidateIndices = new ArrayList<>(); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 6fc0547..72f445c 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -55,7 +55,7 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public void init(Settings settings) throws IOException { if (closed.compareAndSet(false, true)) { - logger.log(Level.DEBUG, "initializing with settings = " + settings.toDelimitedString(',')); + logger.log(Level.INFO, "initializing client with settings = " + settings.toDelimitedString(',')); this.settings = settings; setClient(createClient(settings)); } else { @@ -102,6 +102,7 @@ public abstract class AbstractBasicClient implements BasicClient { public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); + logger.log(Level.DEBUG, "waiting " + timeout + " for shard settling down"); ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() .waitForNoInitializingShards(true) .waitForNoRelocatingShards(true) diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 00f1be3..7aad329 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -4,7 +4,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushAction; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -31,6 +31,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.xbib.elx.api.IndexDefinition.TYPE_NAME; + public abstract class AbstractBulkClient extends AbstractBasicClient implements BulkClient { private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName()); @@ -43,7 +45,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements public void init(Settings settings) throws IOException { if (closed.compareAndSet(true, false)) { super.init(settings); - logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); + logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(',')); bulkController = new DefaultBulkController(this); bulkController.init(settings); } else { @@ -96,32 +98,43 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException { - String mappingString = Strings.toString(builder); - Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingString).mapOrdered(); - newIndex(index, settings, mappings); + public void newIndex(String index, Settings settings, Map mapping) throws IOException { + if (mapping == null || mapping.isEmpty()) { + newIndex(index, settings, (XContentBuilder) null); + } else { + newIndex(index, settings, JsonXContent.contentBuilder().map(mapping)); + } } @Override - public void newIndex(String index, Settings settings, Map mapping) throws IOException { + public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException { if (index == null) { - logger.warn("no index name given to create index"); + logger.warn("unable to create index, no index name given"); return; } ensureClientIsPresent(); waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); - CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index); + CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) + .setIndex(index); if (settings != null) { - createIndexRequest.settings(settings); + createIndexRequestBuilder.setSettings(settings); } - if (mapping != null) { - createIndexRequest.mapping("_doc", mapping); - } - CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); + if (builder != null) { + createIndexRequestBuilder.addMapping(TYPE_NAME, builder); + logger.debug("adding mapping = {}", Strings.toString(builder)); + } else { + createIndexRequestBuilder.addMapping(TYPE_NAME, + JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject()); + logger.debug("empty mapping"); + } + + CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); if (createIndexResponse.isAcknowledged()) { logger.info("index {} created", index); + } else { + logger.warn("index creation of {} not acknowledged", index); } + refreshIndex(index); } @Override @@ -156,7 +169,8 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(String index, String id, boolean create, String source) { - return index(index, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); + return index(new IndexRequest().index(index).id(id).create(create) + .source(new BytesArray(source.getBytes(StandardCharsets.UTF_8)), XContentType.JSON)); } @Override @@ -167,8 +181,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(IndexRequest indexRequest) { - ensureClientIsPresent(); - bulkController.bulkIndex(indexRequest); + if (bulkController != null) { + ensureClientIsPresent(); + bulkController.bulkIndex(indexRequest); + } return this; } @@ -179,8 +195,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient delete(DeleteRequest deleteRequest) { - ensureClientIsPresent(); - bulkController.bulkDelete(deleteRequest); + if (bulkController != null) { + ensureClientIsPresent(); + bulkController.bulkDelete(deleteRequest); + } return this; } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 56af64b..c0f544e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -132,8 +132,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement searchMetric.getCurrentQueries().dec(); searchMetric.getQueries().inc(); searchMetric.markTotalQueries(1); - boolean isempty = originalSearchResponse.getHits().getTotalHits().value == 0; - if (isempty) { + if (originalSearchResponse.getHits().getTotalHits().value == 0) { searchMetric.getEmptyQueries().inc(); } else { searchMetric.getSucceededQueries().inc(); @@ -150,6 +149,11 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement searchMetric.getCurrentQueries().dec(); searchMetric.getQueries().inc(); searchMetric.markTotalQueries(1); + if ( searchResponse1.getHits().getHits().length == 0) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } return searchResponse1; }); Predicate condition = searchResponse -> searchResponse.getHits().getHits().length > 0; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index 4996c71..5cdf37c 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.xbib.elx.api.IndexDefinition.TYPE_NAME; + public class DefaultBulkController implements BulkController { private static final Logger logger = LogManager.getLogger(DefaultBulkController.class); @@ -126,8 +128,8 @@ public class DefaultBulkController implements BulkController { public void bulkIndex(IndexRequest indexRequest) { ensureActiveAndBulk(); try { - bulkMetric.getCurrentIngest().inc(indexRequest.index(), "_doc", indexRequest.id()); bulkProcessor.add(indexRequest); + bulkMetric.getCurrentIngest().inc(indexRequest.index(), TYPE_NAME, indexRequest.id()); } catch (Exception e) { if (logger.isErrorEnabled()) { logger.error("bulk add of index failed: " + e.getMessage(), e); @@ -140,8 +142,8 @@ public class DefaultBulkController implements BulkController { public void bulkDelete(DeleteRequest deleteRequest) { ensureActiveAndBulk(); try { - bulkMetric.getCurrentIngest().inc(deleteRequest.index(), "_doc", deleteRequest.id()); bulkProcessor.add(deleteRequest); + bulkMetric.getCurrentIngest().inc(deleteRequest.index(), TYPE_NAME, deleteRequest.id()); } catch (Exception e) { if (logger.isErrorEnabled()) { logger.error("bulk add of delete failed: " + e.getMessage(), e); @@ -154,8 +156,8 @@ public class DefaultBulkController implements BulkController { public void bulkUpdate(UpdateRequest updateRequest) { ensureActiveAndBulk(); try { - bulkMetric.getCurrentIngest().inc(updateRequest.index(), "_doc", updateRequest.id()); bulkProcessor.add(updateRequest); + bulkMetric.getCurrentIngest().inc(updateRequest.index(), TYPE_NAME, updateRequest.id()); } catch (Exception e) { if (logger.isErrorEnabled()) { logger.error("bulk add of update failed: " + e.getMessage(), e); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 852b1fb..5906d38 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -1,13 +1,10 @@ package org.xbib.elx.common; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -83,6 +80,11 @@ public class DefaultBulkProcessor implements BulkProcessor { } } + @Override + public BulkListener getBulkListener() { + return bulkListener; + } + public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) { Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null"); @@ -143,11 +145,6 @@ public class DefaultBulkProcessor implements BulkProcessor { return bulkRequestHandler.close(timeout, unit); } - @Override - public BulkListener getBulkListener() { - return bulkListener; - } - /** * Adds either a delete or an index request. * @@ -155,8 +152,14 @@ public class DefaultBulkProcessor implements BulkProcessor { * @return his bulk processor */ @Override - public DefaultBulkProcessor add(ActionRequest request) { - internalAdd(request); + public synchronized DefaultBulkProcessor add(DocWriteRequest request) { + ensureOpen(); + bulkRequest.add(request); + if (bulkActions != -1 && + bulkRequest.numberOfActions() >= bulkActions || + bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) { + execute(); + } return this; } @@ -190,41 +193,13 @@ public class DefaultBulkProcessor implements BulkProcessor { } } - private synchronized void internalAdd(ActionRequest request) { - ensureOpen(); - if (request instanceof IndexRequest) { - bulkRequest.add((IndexRequest) request); - } else if (request instanceof DeleteRequest) { - bulkRequest.add((DeleteRequest) request); - } else if (request instanceof UpdateRequest) { - bulkRequest.add((UpdateRequest) request); - } else { - throw new UnsupportedOperationException(); - } - executeIfNeeded(); - } - - private void executeIfNeeded() { - ensureOpen(); - if (!isOverTheLimit()) { - return; - } - execute(); - } - private void execute() { - final BulkRequest myBulkRequest = this.bulkRequest; - final long executionId = executionIdGen.incrementAndGet(); + BulkRequest myBulkRequest = this.bulkRequest; + long executionId = executionIdGen.incrementAndGet(); this.bulkRequest = new BulkRequest(); this.bulkRequestHandler.execute(myBulkRequest, executionId); } - private boolean isOverTheLimit() { - return bulkActions != -1 && - bulkRequest.numberOfActions() >= bulkActions || - bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize; - } - /** * A builder used to create a build an instance of a bulk processor. */ @@ -393,7 +368,7 @@ public class DefaultBulkProcessor implements BulkProcessor { } @Override - public void execute(final BulkRequest bulkRequest, final long executionId) { + public void execute(BulkRequest bulkRequest, long executionId) { boolean bulkRequestSetupSuccessful = false; boolean acquired = false; try { diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java index 0464d01..648be9e 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java @@ -92,7 +92,7 @@ class BulkClientTest { .endObject() .endObject(); bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(adminClient.getMapping("test", "_doc").containsKey("properties")); + assertTrue(adminClient.getMapping("test").containsKey("properties")); } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java index 074293f..1d9aef6 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java @@ -56,6 +56,8 @@ class SearchTest { assertEquals(numactions, bulkClient.getSearchableDocs("test")); bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}"); bulkClient.flush(); + bulkClient.refreshIndex("test"); + assertEquals(numactions + 1, bulkClient.getSearchableDocs("test")); } assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { @@ -84,8 +86,8 @@ class SearchTest { }); assertEquals(numactions + 1, idcount.get()); assertEquals(15, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(3, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index c84cf4c..74f2329 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -92,7 +92,7 @@ class BulkClientTest { .endObject() .endObject(); bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(adminClient.getMapping("test", "_doc").containsKey("properties")); + assertTrue(adminClient.getMapping("test").containsKey("properties")); } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index b0af05d..58ba344 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -15,6 +16,8 @@ import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeSearchClient; import org.xbib.elx.node.NodeSearchClientProvider; + +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -52,8 +55,13 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}"); + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertEquals(numactions + 1, bulkClient.getSearchableDocs("test")); } - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -62,12 +70,14 @@ class SearchTest { .setSearchClientProvider(NodeSearchClientProvider.class) .put(helper.getNodeSettings("1")) .build()) { + Optional responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0")); + assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString()); Stream stream = searchClient.search(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMinutes(1), 10); long count = stream.count(); - assertEquals(numactions, count); + assertEquals(numactions + 1, count); Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); @@ -76,10 +86,10 @@ class SearchTest { logger.info(id); idcount.incrementAndGet(); }); - assertEquals(numactions, idcount.get()); - assertEquals(13, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(numactions + 1, idcount.get()); + assertEquals(15, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index c873251..45d2b07 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -93,7 +93,7 @@ class BulkClientTest { .endObject() .endObject(); bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(adminClient.getMapping("test", "_doc").containsKey("properties")); + assertTrue(adminClient.getMapping("test").containsKey("properties")); } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 808712b..fd24ebd 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -15,6 +16,8 @@ import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportSearchClient; import org.xbib.elx.transport.TransportSearchClientProvider; + +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -52,8 +55,13 @@ class SearchTest { bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}"); + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertEquals(numactions + 1, bulkClient.getSearchableDocs("test")); } - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -62,12 +70,14 @@ class SearchTest { .setSearchClientProvider(TransportSearchClientProvider.class) .put(helper.getTransportSettings()) .build()) { + Optional responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0")); + assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString()); Stream stream = searchClient.search(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMinutes(1), 10); long count = stream.count(); - assertEquals(numactions, count); + assertEquals(numactions + 1, count); Stream ids = searchClient.getIds(qb -> qb .setIndices("test") .setQuery(QueryBuilders.matchAllQuery())); @@ -76,10 +86,10 @@ class SearchTest { logger.info(id); idcount.incrementAndGet(); }); - assertEquals(numactions, idcount.get()); - assertEquals(13, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(numactions + 1, idcount.get()); + assertEquals(15, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } }