bulk metric moved to controller, search metric has landed

bulk metric moved to controller, search metric has landed
This commit is contained in:
Jörg Prante 2020-05-27 11:57:50 +02:00
parent 5423cf0f11
commit 188708d0ed
25 changed files with 284 additions and 78 deletions

View file

@ -23,7 +23,6 @@ ext {
licenseUrl = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
subprojects {
apply plugin: 'java-library'

View file

@ -26,6 +26,8 @@ public interface AdminClient extends BasicClient {
Map<String, ?> getMapping(String index, String type) throws IOException;
void checkMapping(String index);
/**
* Delete an index.
* @param indexDefinition the index definition

View file

@ -14,13 +14,7 @@ import java.util.concurrent.TimeUnit;
public interface BulkClient extends BasicClient, Flushable {
/**
* Get bulk metric.
* @return the bulk metric
*/
BulkMetric getBulkMetric();
/**
* Get buulk control.
* Get bulk control.
* @return the bulk control
*/
BulkController getBulkController();

View file

@ -10,6 +10,8 @@ public interface BulkMetric extends Closeable {
void init(Settings settings);
void markTotalIngest(long n);
Metered getTotalIngest();
Count getTotalIngestSizeInBytes();

View file

@ -14,6 +14,8 @@ import java.util.stream.Stream;
public interface SearchClient extends BasicClient {
SearchMetric getSearchMetric();
Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilder);
Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilder);

View file

@ -0,0 +1,29 @@
package org.xbib.elx.api;
import org.elasticsearch.common.settings.Settings;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
import java.io.Closeable;
public interface SearchMetric extends Closeable {
void init(Settings settings);
void markTotalQueries(long n);
Metered getTotalQueries();
Count getCurrentQueries();
Count getQueries();
Count getSucceededQueries();
Count getEmptyQueries();
long elapsed();
void start();
void stop();
}

View file

@ -64,7 +64,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.MalformedInputException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.ZoneId;
@ -134,12 +133,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
};
@Override
public Map<String, ?> getMapping(String index) throws IOException {
public Map<String, ?> getMapping(String index) {
return getMapping(index, TYPE_NAME);
}
@Override
public Map<String, ?> getMapping(String index, String mapping) throws IOException {
public Map<String, ?> getMapping(String index, String mapping) {
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
.setIndices(index)
.setTypes(mapping);
@ -506,6 +505,22 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
}
@Override
public void checkMapping(String index) {
ensureClientIsPresent();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
ImmutableOpenMap<String, MappingMetaData> mappings = map.get(stringObjectCursor.value);
for (ObjectObjectCursor<String, MappingMetaData> cursor : mappings) {
String mappingName = cursor.key;
MappingMetaData mappingMetaData = cursor.value;
checkMapping(index, mappingName, mappingMetaData);
}
});
}
private static String findSettingsFrom(String string) throws IOException {
if (string == null) {
return null;
@ -546,7 +561,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
}
return string;
} catch (MalformedInputException e) {
} catch (MalformedURLException e) {
return string;
}
}
@ -567,21 +582,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return result;
}
public void checkMapping(String index) {
ensureClientIsPresent();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> map = getMappingsResponse.getMappings();
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
ImmutableOpenMap<String, MappingMetaData> mappings = map.get(stringObjectCursor.value);
for (ObjectObjectCursor<String, MappingMetaData> cursor : mappings) {
String mappingName = cursor.key;
MappingMetaData mappingMetaData = cursor.value;
checkMapping(index, mappingName, mappingMetaData);
}
});
}
private void checkMapping(String index, String type, MappingMetaData mappingMetaData) {
try {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
@ -595,7 +595,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
if (total > 0L) {
Map<String, Long> fields = new TreeMap<>();
Map<String, Object> root = mappingMetaData.getSourceAsMap();
checkMapping(index, type, "", "", root, fields);
checkMapping(index, "", "", root, fields);
AtomicInteger empty = new AtomicInteger();
Map<String, Long> map = sortByValue(fields);
map.forEach((key, value) -> {
@ -616,7 +616,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
@SuppressWarnings("unchecked")
private void checkMapping(String index, String type,
private void checkMapping(String index,
String pathDef, String fieldName, Map<String, Object> map,
Map<String, Long> fields) {
String path = pathDef;
@ -641,7 +641,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
String fieldType = o instanceof String ? o.toString() : null;
// do not recurse into our custom field mapper
if (!"standardnumber".equals(fieldType) && !"ref".equals(fieldType)) {
checkMapping(index, type, path, key, child, fields);
checkMapping(index, path, key, child, fields);
}
} else if ("type".equals(key)) {
QueryBuilder filterBuilder = QueryBuilders.existsQuery(path);

View file

@ -24,7 +24,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.xbib.elx.api.BulkClient;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.IndexDefinition;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -36,8 +35,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName());
private BulkMetric bulkMetric;
private BulkController bulkController;
private final AtomicBoolean closed = new AtomicBoolean(true);
@ -47,20 +44,13 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
if (closed.compareAndSet(true, false)) {
super.init(settings);
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
bulkMetric = new DefaultBulkMetric();
bulkMetric.init(settings);
bulkController = new DefaultBulkController(this, bulkMetric);
bulkController = new DefaultBulkController(this);
bulkController.init(settings);
} else {
logger.log(Level.WARN, "not initializing");
}
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public BulkController getBulkController() {
return bulkController;
@ -77,10 +67,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
ensureClientIsPresent();
if (bulkMetric != null) {
logger.info("closing bulk metric");
bulkMetric.close();
}
if (bulkController != null) {
logger.info("closing bulk controller");
bulkController.close();

View file

@ -1,6 +1,7 @@
package org.xbib.elx.common;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
@ -15,9 +16,12 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.xbib.elx.api.SearchClient;
import org.xbib.elx.api.SearchMetric;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Optional;
@ -30,11 +34,43 @@ import java.util.stream.StreamSupport;
public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient {
private SearchMetric searchMetric;
@Override
public SearchMetric getSearchMetric() {
return searchMetric;
}
@Override
public void init(Settings settings) throws IOException {
super.init(settings);
this.searchMetric = new DefaultSearchMetric();
searchMetric.init(settings);
}
@Override
public void close() throws IOException {
super.close();
if (searchMetric != null) {
searchMetric.close();
}
}
@Override
public Optional<GetResponse> get(Consumer<GetRequestBuilder> getRequestBuilderConsumer) {
GetRequestBuilder getRequestBuilder = new GetRequestBuilder(client, GetAction.INSTANCE);
getRequestBuilderConsumer.accept(getRequestBuilder);
GetResponse getResponse = getRequestBuilder.execute().actionGet();
ActionFuture<GetResponse> actionFuture = getRequestBuilder.execute();
searchMetric.getCurrentQueries().inc();
GetResponse getResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
if (getResponse.isExists()) {
searchMetric.getSucceededQueries().inc();
} else {
searchMetric.getEmptyQueries().inc();
}
return getResponse.isExists() ? Optional.of(getResponse) : Optional.empty();
}
@ -42,23 +78,46 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
public Optional<MultiGetResponse> multiGet(Consumer<MultiGetRequestBuilder> multiGetRequestBuilderConsumer) {
MultiGetRequestBuilder multiGetRequestBuilder = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
multiGetRequestBuilderConsumer.accept(multiGetRequestBuilder);
MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.execute().actionGet();
return multiGetItemResponse.getResponses().length == 0 ? Optional.empty() : Optional.of(multiGetItemResponse);
ActionFuture<MultiGetResponse> actionFuture = multiGetRequestBuilder.execute();
searchMetric.getCurrentQueries().inc();
MultiGetResponse multiGetItemResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
boolean isempty = multiGetItemResponse.getResponses().length == 0;
if (isempty) {
searchMetric.getEmptyQueries().inc();
} else {
searchMetric.getSucceededQueries().inc();
}
return isempty ? Optional.empty() : Optional.of(multiGetItemResponse);
}
@Override
public Optional<SearchResponse> search(Consumer<SearchRequestBuilder> queryBuilder) {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
searchMetric.getCurrentQueries().inc();
SearchResponse searchResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
if (searchResponse.getFailedShards() > 0) {
StringBuilder sb = new StringBuilder("Search failed:");
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
sb.append("\n").append(failure.reason());
}
searchMetric.getEmptyQueries().inc();
throw new ElasticsearchException(sb.toString());
}
return searchResponse.getHits().getHits().length == 0 ? Optional.empty() : Optional.of(searchResponse);
boolean isempty = searchResponse.getHits().getHits().length == 0;
if (isempty) {
searchMetric.getEmptyQueries().inc();
} else {
searchMetric.getSucceededQueries().inc();
}
return isempty ? Optional.empty() : Optional.of(searchResponse);
}
@Override
@ -67,12 +126,32 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE);
queryBuilder.accept(searchRequestBuilder);
searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize);
SearchResponse originalSearchResponse = searchRequestBuilder.execute().actionGet();
ActionFuture<SearchResponse> actionFuture = searchRequestBuilder.execute();
searchMetric.getCurrentQueries().inc();
SearchResponse originalSearchResponse = actionFuture.actionGet();
searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
boolean isempty = originalSearchResponse.getHits().getTotalHits().value == 0;
if (isempty) {
searchMetric.getEmptyQueries().inc();
} else {
searchMetric.getSucceededQueries().inc();
}
Stream<SearchResponse> infiniteResponses = Stream.iterate(originalSearchResponse,
searchResponse -> new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
.setScrollId(searchResponse.getScrollId())
.setScroll(scrollTime)
.execute().actionGet());
searchResponse -> {
SearchScrollRequestBuilder searchScrollRequestBuilder =
new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
.setScrollId(searchResponse.getScrollId())
.setScroll(scrollTime);
ActionFuture<SearchResponse> actionFuture1 = searchScrollRequestBuilder.execute();
searchMetric.getCurrentQueries().inc();
SearchResponse searchResponse1 = actionFuture1.actionGet();
searchMetric.getCurrentQueries().dec();
searchMetric.getQueries().inc();
searchMetric.markTotalQueries(1);
return searchResponse1;
});
Predicate<SearchResponse> condition = searchResponse -> searchResponse.getHits().getHits().length > 0;
Consumer<SearchResponse> lastAction = searchResponse -> {
ClearScrollRequestBuilder clearScrollRequestBuilder =

View file

@ -45,9 +45,9 @@ public class DefaultBulkController implements BulkController {
private final AtomicBoolean active;
public DefaultBulkController(BulkClient bulkClient, BulkMetric bulkMetric) {
public DefaultBulkController(BulkClient bulkClient) {
this.bulkClient = bulkClient;
this.bulkMetric = bulkMetric;
this.bulkMetric = new DefaultBulkMetric();
this.indexNames = new ArrayList<>();
this.active = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
@ -68,6 +68,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void init(Settings settings) {
bulkMetric.init(settings);
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(),
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum());
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(),

View file

@ -51,6 +51,7 @@ public class DefaultBulkListener implements BulkListener {
long l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
bulkMetric.markTotalIngest(response.getItems().length);
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());

View file

@ -44,6 +44,11 @@ public class DefaultBulkMetric implements BulkMetric {
start();
}
@Override
public void markTotalIngest(long n) {
totalIngest.mark(n);
}
@Override
public Metered getTotalIngest() {
return totalIngest;

View file

@ -0,0 +1,97 @@
package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.SearchMetric;
import org.xbib.metrics.api.Count;
import org.xbib.metrics.api.Metered;
import org.xbib.metrics.common.CountMetric;
import org.xbib.metrics.common.Meter;
import java.util.concurrent.Executors;
public class DefaultSearchMetric implements SearchMetric {
private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName());
private final Meter totalQuery;
private final Count currentQuery;
private final Count queries;
private final Count succeededQueries;
private final Count emptyQueries;
private Long started;
private Long stopped;
public DefaultSearchMetric() {
totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor());
currentQuery = new CountMetric();
queries = new CountMetric();
succeededQueries = new CountMetric();
emptyQueries = new CountMetric();
}
@Override
public void init(Settings settings) {
logger.info("init");
start();
}
@Override
public void markTotalQueries(long n) {
totalQuery.mark(n);
}
@Override
public Metered getTotalQueries() {
return totalQuery;
}
@Override
public Count getCurrentQueries() {
return currentQuery;
}
@Override
public Count getQueries() {
return queries;
}
@Override
public Count getSucceededQueries() {
return succeededQueries;
}
@Override
public Count getEmptyQueries() {
return emptyQueries;
}
@Override
public long elapsed() {
return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L;
}
@Override
public void start() {
this.started = System.nanoTime();
totalQuery.start(5L);
}
@Override
public void stop() {
this.stopped = System.nanoTime();
totalQuery.stop();
}
@Override
public void close() {
stop();
totalQuery.shutdown();
}
}

View file

@ -53,7 +53,7 @@ class BulkClientTest {
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@ -113,7 +113,7 @@ class BulkClientTest {
client.flush();
client.waitForResponses(30L, TimeUnit.SECONDS);
} finally {
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}
@ -168,7 +168,7 @@ class BulkClientTest {
logger.warn("latch timeout");
}
bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS);
assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
} finally {
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());

View file

@ -51,7 +51,7 @@ class DuplicateIDTest {
assertTrue(hits < ACTIONS);
} finally {
client.close();
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount());
if (client.getBulkController().getLastBulkError() != null) {
logger.error("error", client.getBulkController().getLastBulkError());
}

View file

@ -53,7 +53,7 @@ class SearchTest {
bulkClient.refreshIndex("test");
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
}
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -77,6 +77,9 @@ class SearchTest {
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get());
assertEquals(13, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
}
}

View file

@ -63,8 +63,8 @@ class SmokeTest {
adminClient.updateReplicaLevel(indexDefinition, 2);
int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}

View file

@ -53,7 +53,7 @@ class BulkClientTest {
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
} finally {
assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -114,7 +114,7 @@ class BulkClientTest {
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
} finally {
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -166,7 +166,7 @@ class BulkClientTest {
logger.warn("latch timeout");
}
bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS);
assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
} finally {
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());

View file

@ -50,7 +50,7 @@ class DuplicateIDTest {
assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS);
} finally {
bulkClient.close();
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}

View file

@ -53,7 +53,7 @@ class SearchTest {
bulkClient.refreshIndex("test");
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
}
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -77,6 +77,9 @@ class SearchTest {
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get());
assertEquals(13, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
}
}

View file

@ -63,8 +63,8 @@ class SmokeTest {
adminClient.updateReplicaLevel(indexDefinition, 2);
int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}

View file

@ -54,7 +54,7 @@ class BulkClientTest {
bulkClient.flush();
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
} finally {
assertEquals(1, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -117,7 +117,7 @@ class BulkClientTest {
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -172,7 +172,7 @@ class BulkClientTest {
logger.warn("latch timeout");
}
bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS);
assertEquals(maxthreads * actions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} catch (Exception e) {

View file

@ -50,7 +50,7 @@ class DuplicateIDTest {
assertTrue(bulkClient.getSearchableDocs("test_dup") < ACTIONS);
} finally {
bulkClient.close();
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}

View file

@ -53,7 +53,7 @@ class SearchTest {
bulkClient.refreshIndex("test");
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
}
assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}
@ -77,6 +77,9 @@ class SearchTest {
idcount.incrementAndGet();
});
assertEquals(numactions, idcount.get());
assertEquals(13, searchClient.getSearchMetric().getQueries().getCount());
assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount());
assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount());
}
}
}

View file

@ -63,8 +63,8 @@ class SmokeTest {
adminClient.updateReplicaLevel(indexDefinition, 2);
int replica = adminClient.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount());
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
if (bulkClient.getBulkController().getLastBulkError() != null) {
logger.error("error", bulkClient.getBulkController().getLastBulkError());
}