From 188708d0ed77d63549e15f36fa9f84b162e59f06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Wed, 27 May 2020 11:57:50 +0200 Subject: [PATCH] bulk metric moved to controller, search metric has landed bulk metric moved to controller, search metric has landed --- build.gradle | 1 - .../java/org/xbib/elx/api/AdminClient.java | 2 + .../java/org/xbib/elx/api/BulkClient.java | 8 +- .../java/org/xbib/elx/api/BulkMetric.java | 2 + .../java/org/xbib/elx/api/SearchClient.java | 2 + .../java/org/xbib/elx/api/SearchMetric.java | 29 ++++++ .../xbib/elx/common/AbstractAdminClient.java | 44 ++++----- .../xbib/elx/common/AbstractBulkClient.java | 16 +-- .../xbib/elx/common/AbstractSearchClient.java | 99 +++++++++++++++++-- .../elx/common/DefaultBulkController.java | 5 +- .../xbib/elx/common/DefaultBulkListener.java | 1 + .../xbib/elx/common/DefaultBulkMetric.java | 5 + .../xbib/elx/common/DefaultSearchMetric.java | 97 ++++++++++++++++++ .../xbib/elx/http/test/BulkClientTest.java | 6 +- .../xbib/elx/http/test/DuplicateIDTest.java | 2 +- .../org/xbib/elx/http/test/SearchTest.java | 5 +- .../org/xbib/elx/http/test/SmokeTest.java | 4 +- .../xbib/elx/node/test/BulkClientTest.java | 6 +- .../xbib/elx/node/test/DuplicateIDTest.java | 2 +- .../org/xbib/elx/node/test/SearchTest.java | 5 +- .../org/xbib/elx/node/test/SmokeTest.java | 4 +- .../elx/transport/test/BulkClientTest.java | 6 +- .../elx/transport/test/DuplicateIDTest.java | 2 +- .../xbib/elx/transport/test/SearchTest.java | 5 +- .../xbib/elx/transport/test/SmokeTest.java | 4 +- 25 files changed, 284 insertions(+), 78 deletions(-) create mode 100644 elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java create mode 100644 elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java diff --git a/build.gradle b/build.gradle index 0921dd9..6f4353e 100644 --- a/build.gradle +++ b/build.gradle @@ -23,7 +23,6 @@ ext { licenseUrl = 'http://www.apache.org/licenses/LICENSE-2.0.txt' } - subprojects { apply plugin: 'java-library' diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index d43a89a..a7264ab 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -26,6 +26,8 @@ public interface AdminClient extends BasicClient { Map getMapping(String index, String type) throws IOException; + void checkMapping(String index); + /** * Delete an index. * @param indexDefinition the index definition diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index e7ffc59..3c24d82 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -14,13 +14,7 @@ import java.util.concurrent.TimeUnit; public interface BulkClient extends BasicClient, Flushable { /** - * Get bulk metric. - * @return the bulk metric - */ - BulkMetric getBulkMetric(); - - /** - * Get buulk control. + * Get bulk control. * @return the bulk control */ BulkController getBulkController(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index af825e5..b7d11e0 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -10,6 +10,8 @@ public interface BulkMetric extends Closeable { void init(Settings settings); + void markTotalIngest(long n); + Metered getTotalIngest(); Count getTotalIngestSizeInBytes(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java index 3a04949..4932bcf 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java @@ -14,6 +14,8 @@ import java.util.stream.Stream; public interface SearchClient extends BasicClient { + SearchMetric getSearchMetric(); + Optional get(Consumer getRequestBuilder); Optional multiGet(Consumer multiGetRequestBuilder); diff --git a/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java new file mode 100644 index 0000000..e0cdb96 --- /dev/null +++ b/elx-api/src/main/java/org/xbib/elx/api/SearchMetric.java @@ -0,0 +1,29 @@ +package org.xbib.elx.api; + +import org.elasticsearch.common.settings.Settings; +import org.xbib.metrics.api.Count; +import org.xbib.metrics.api.Metered; +import java.io.Closeable; + +public interface SearchMetric extends Closeable { + + void init(Settings settings); + + void markTotalQueries(long n); + + Metered getTotalQueries(); + + Count getCurrentQueries(); + + Count getQueries(); + + Count getSucceededQueries(); + + Count getEmptyQueries(); + + long elapsed(); + + void start(); + + void stop(); +} diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 6219839..4bf3ab4 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -64,7 +64,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.nio.charset.MalformedInputException; import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.ZoneId; @@ -134,12 +133,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements }; @Override - public Map getMapping(String index) throws IOException { + public Map getMapping(String index) { return getMapping(index, TYPE_NAME); } @Override - public Map getMapping(String index, String mapping) throws IOException { + public Map getMapping(String index, String mapping) { GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) .setIndices(index) .setTypes(mapping); @@ -506,6 +505,22 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); } + @Override + public void checkMapping(String index) { + ensureClientIsPresent(); + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); + GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); + ImmutableOpenMap> map = getMappingsResponse.getMappings(); + map.keys().forEach((Consumer>) stringObjectCursor -> { + ImmutableOpenMap mappings = map.get(stringObjectCursor.value); + for (ObjectObjectCursor cursor : mappings) { + String mappingName = cursor.key; + MappingMetaData mappingMetaData = cursor.value; + checkMapping(index, mappingName, mappingMetaData); + } + }); + } + private static String findSettingsFrom(String string) throws IOException { if (string == null) { return null; @@ -546,7 +561,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } return string; - } catch (MalformedInputException e) { + } catch (MalformedURLException e) { return string; } } @@ -567,21 +582,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return result; } - public void checkMapping(String index) { - ensureClientIsPresent(); - GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); - GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); - ImmutableOpenMap> map = getMappingsResponse.getMappings(); - map.keys().forEach((Consumer>) stringObjectCursor -> { - ImmutableOpenMap mappings = map.get(stringObjectCursor.value); - for (ObjectObjectCursor cursor : mappings) { - String mappingName = cursor.key; - MappingMetaData mappingMetaData = cursor.value; - checkMapping(index, mappingName, mappingMetaData); - } - }); - } - private void checkMapping(String index, String type, MappingMetaData mappingMetaData) { try { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) @@ -595,7 +595,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (total > 0L) { Map fields = new TreeMap<>(); Map root = mappingMetaData.getSourceAsMap(); - checkMapping(index, type, "", "", root, fields); + checkMapping(index, "", "", root, fields); AtomicInteger empty = new AtomicInteger(); Map map = sortByValue(fields); map.forEach((key, value) -> { @@ -616,7 +616,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @SuppressWarnings("unchecked") - private void checkMapping(String index, String type, + private void checkMapping(String index, String pathDef, String fieldName, Map map, Map fields) { String path = pathDef; @@ -641,7 +641,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements String fieldType = o instanceof String ? o.toString() : null; // do not recurse into our custom field mapper if (!"standardnumber".equals(fieldType) && !"ref".equals(fieldType)) { - checkMapping(index, type, path, key, child, fields); + checkMapping(index, path, key, child, fields); } } else if ("type".equals(key)) { QueryBuilder filterBuilder = QueryBuilders.existsQuery(path); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 29d2940..00f1be3 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkController; -import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -36,8 +35,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName()); - private BulkMetric bulkMetric; - private BulkController bulkController; private final AtomicBoolean closed = new AtomicBoolean(true); @@ -47,20 +44,13 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (closed.compareAndSet(true, false)) { super.init(settings); logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); - bulkMetric = new DefaultBulkMetric(); - bulkMetric.init(settings); - bulkController = new DefaultBulkController(this, bulkMetric); + bulkController = new DefaultBulkController(this); bulkController.init(settings); } else { logger.log(Level.WARN, "not initializing"); } } - @Override - public BulkMetric getBulkMetric() { - return bulkMetric; - } - @Override public BulkController getBulkController() { return bulkController; @@ -77,10 +67,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements public void close() throws IOException { if (closed.compareAndSet(false, true)) { ensureClientIsPresent(); - if (bulkMetric != null) { - logger.info("closing bulk metric"); - bulkMetric.close(); - } if (bulkController != null) { logger.info("closing bulk controller"); bulkController.close(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 4dfd3b7..56af64b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -1,6 +1,7 @@ package org.xbib.elx.common; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; @@ -15,9 +16,12 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.xbib.elx.api.SearchClient; +import org.xbib.elx.api.SearchMetric; +import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.Optional; @@ -30,11 +34,43 @@ import java.util.stream.StreamSupport; public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient { + private SearchMetric searchMetric; + + @Override + public SearchMetric getSearchMetric() { + return searchMetric; + } + + @Override + public void init(Settings settings) throws IOException { + super.init(settings); + this.searchMetric = new DefaultSearchMetric(); + searchMetric.init(settings); + } + + @Override + public void close() throws IOException { + super.close(); + if (searchMetric != null) { + searchMetric.close(); + } + } + @Override public Optional get(Consumer getRequestBuilderConsumer) { GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE); getRequestBuilderConsumer.accept(getRequestBuilder); - GetResponse getResponse = getRequestBuilder.execute().actionGet(); + ActionFuture actionFuture = getRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + GetResponse getResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + if (getResponse.isExists()) { + searchMetric.getSucceededQueries().inc(); + } else { + searchMetric.getEmptyQueries().inc(); + } return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty(); } @@ -42,23 +78,46 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement public Optional multiGet(Consumer multiGetRequestBuilderConsumer) { MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE); multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder); - MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.execute().actionGet(); - return multiGetItemResponse.getResponses().length == 0 ? Optional.empty() : Optional.of(multiGetItemResponse); + ActionFuture actionFuture = multiGetRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + MultiGetResponse multiGetItemResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + boolean isempty = multiGetItemResponse.getResponses().length == 0; + if (isempty) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } + return isempty ? Optional.empty() : Optional.of(multiGetItemResponse); } @Override public Optional search(Consumer queryBuilder) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); - SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); + ActionFuture actionFuture = searchRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + SearchResponse searchResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); if (searchResponse.getFailedShards() > 0) { StringBuilder sb = new StringBuilder("Search failed:"); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { sb.append("\n").append(failure.reason()); } + searchMetric.getEmptyQueries().inc(); throw new ElasticsearchException(sb.toString()); } - return searchResponse.getHits().getHits().length == 0 ? Optional.empty() : Optional.of(searchResponse); + boolean isempty = searchResponse.getHits().getHits().length == 0; + if (isempty) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } + return isempty ? Optional.empty() : Optional.of(searchResponse); } @Override @@ -67,12 +126,32 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); - SearchResponse originalSearchResponse = searchRequestBuilder.execute().actionGet(); + ActionFuture actionFuture = searchRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + SearchResponse originalSearchResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + boolean isempty = originalSearchResponse.getHits().getTotalHits().value == 0; + if (isempty) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } Stream infiniteResponses = Stream.iterate(originalSearchResponse, - searchResponse -> new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) - .setScrollId(searchResponse.getScrollId()) - .setScroll(scrollTime) - .execute().actionGet()); + searchResponse -> { + SearchScrollRequestBuilder searchScrollRequestBuilder = + new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) + .setScrollId(searchResponse.getScrollId()) + .setScroll(scrollTime); + ActionFuture actionFuture1 = searchScrollRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + SearchResponse searchResponse1 = actionFuture1.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + return searchResponse1; + }); Predicate condition = searchResponse -> searchResponse.getHits().getHits().length > 0; Consumer lastAction = searchResponse -> { ClearScrollRequestBuilder clearScrollRequestBuilder = diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index 0420fce..9c935dc 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -45,9 +45,9 @@ public class DefaultBulkController implements BulkController { private final AtomicBoolean active; - public DefaultBulkController(BulkClient bulkClient, BulkMetric bulkMetric) { + public DefaultBulkController(BulkClient bulkClient) { this.bulkClient = bulkClient; - this.bulkMetric = bulkMetric; + this.bulkMetric = new DefaultBulkMetric(); this.indexNames = new ArrayList<>(); this.active = new AtomicBoolean(false); this.startBulkRefreshIntervals = new HashMap<>(); @@ -68,6 +68,7 @@ public class DefaultBulkController implements BulkController { @Override public void init(Settings settings) { + bulkMetric.init(settings); int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index c57da2d..572e717 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -51,6 +51,7 @@ public class DefaultBulkListener implements BulkListener { long l = bulkMetric.getCurrentIngest().getCount(); bulkMetric.getCurrentIngest().dec(); bulkMetric.getSucceeded().inc(response.getItems().length); + bulkMetric.markTotalIngest(response.getItems().length); int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 8127e29..300e227 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -44,6 +44,11 @@ public class DefaultBulkMetric implements BulkMetric { start(); } + @Override + public void markTotalIngest(long n) { + totalIngest.mark(n); + } + @Override public Metered getTotalIngest() { return totalIngest; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java new file mode 100644 index 0000000..5f164b3 --- /dev/null +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -0,0 +1,97 @@ +package org.xbib.elx.common; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.Settings; +import org.xbib.elx.api.SearchMetric; +import org.xbib.metrics.api.Count; +import org.xbib.metrics.api.Metered; +import org.xbib.metrics.common.CountMetric; +import org.xbib.metrics.common.Meter; +import java.util.concurrent.Executors; + +public class DefaultSearchMetric implements SearchMetric { + + private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName()); + + private final Meter totalQuery; + + private final Count currentQuery; + + private final Count queries; + + private final Count succeededQueries; + + private final Count emptyQueries; + + private Long started; + + private Long stopped; + + public DefaultSearchMetric() { + totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor()); + currentQuery = new CountMetric(); + queries = new CountMetric(); + succeededQueries = new CountMetric(); + emptyQueries = new CountMetric(); + } + + @Override + public void init(Settings settings) { + logger.info("init"); + start(); + } + + @Override + public void markTotalQueries(long n) { + totalQuery.mark(n); + } + + @Override + public Metered getTotalQueries() { + return totalQuery; + } + + @Override + public Count getCurrentQueries() { + return currentQuery; + } + + @Override + public Count getQueries() { + return queries; + } + + @Override + public Count getSucceededQueries() { + return succeededQueries; + } + + @Override + public Count getEmptyQueries() { + return emptyQueries; + } + + @Override + public long elapsed() { + return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L; + } + + @Override + public void start() { + this.started = System.nanoTime(); + totalQuery.start(5L); + } + + @Override + public void stop() { + this.stopped = System.nanoTime(); + totalQuery.stop(); + } + + @Override + public void close() { + stop(); + totalQuery.shutdown(); + } +} diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java index 1a3c231..0464d01 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java @@ -53,7 +53,7 @@ class BulkClientTest { client.flush(); client.waitForResponses(30L, TimeUnit.SECONDS); } finally { - assertEquals(1, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } @@ -113,7 +113,7 @@ class BulkClientTest { client.flush(); client.waitForResponses(30L, TimeUnit.SECONDS); } finally { - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } @@ -168,7 +168,7 @@ class BulkClientTest { logger.warn("latch timeout"); } bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); - assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); } finally { if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java index 44cff60..066ea7c 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java @@ -51,7 +51,7 @@ class DuplicateIDTest { assertTrue(hits < ACTIONS); } finally { client.close(); - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java index 1b126e9..3ed603e 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java @@ -53,7 +53,7 @@ class SearchTest { bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); } - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -77,6 +77,9 @@ class SearchTest { idcount.incrementAndGet(); }); assertEquals(numactions, idcount.get()); + assertEquals(13, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java index bd2072f..fc05334 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java @@ -63,8 +63,8 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 92103f5..777b1c0 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -53,7 +53,7 @@ class BulkClientTest { bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); } finally { - assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -114,7 +114,7 @@ class BulkClientTest { bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); } finally { - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -166,7 +166,7 @@ class BulkClientTest { logger.warn("latch timeout"); } bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); - assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); } finally { if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index d8d76f9..098a235 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -50,7 +50,7 @@ class DuplicateIDTest { assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); } finally { bulkClient.close(); - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 33dd290..27556c3 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -53,7 +53,7 @@ class SearchTest { bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); } - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -77,6 +77,9 @@ class SearchTest { idcount.incrementAndGet(); }); assertEquals(numactions, idcount.get()); + assertEquals(13, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index c7badca..5395a3b 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -63,8 +63,8 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index b3a1df8..c873251 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -54,7 +54,7 @@ class BulkClientTest { bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); } finally { - assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -117,7 +117,7 @@ class BulkClientTest { } catch (Exception e) { logger.error(e.getMessage(), e); } finally { - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -172,7 +172,7 @@ class BulkClientTest { logger.warn("latch timeout"); } bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); - assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } catch (Exception e) { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index ffd23d2..c2c8dae 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -50,7 +50,7 @@ class DuplicateIDTest { assertTrue(bulkClient.getSearchableDocs("test_dup") < ACTIONS); } finally { bulkClient.close(); - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index c923a57..808712b 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -53,7 +53,7 @@ class SearchTest { bulkClient.refreshIndex("test"); assertEquals(numactions, bulkClient.getSearchableDocs("test")); } - assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } @@ -77,6 +77,9 @@ class SearchTest { idcount.incrementAndGet(); }); assertEquals(numactions, idcount.get()); + assertEquals(13, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index cea2907..53e5758 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -63,8 +63,8 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); }