make metric optional, but set bulk volume to max if bulk metric is disabled, default is bulk metric enabled
This commit is contained in:
parent
7595fbaeee
commit
3dd47fc953
12 changed files with 98 additions and 61 deletions
|
@ -42,7 +42,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
|
||||
private final DefaultBulkListener bulkListener;
|
||||
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private ScheduledFuture<?> flushIntervalFuture;
|
||||
|
||||
private BulkRequest bulkRequest;
|
||||
|
||||
|
@ -66,16 +66,21 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
Parameters.BULK_FLUSH_INTERVAL.getString());
|
||||
TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr,
|
||||
TimeValue.timeValueSeconds(30), "");
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
this.client = bulkClient.getClient();
|
||||
if (flushInterval.millis() > 0L) {
|
||||
this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
||||
this.flushIntervalFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
||||
flushInterval.millis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
this.bulkListener = new DefaultBulkListener(this, settings);
|
||||
this.bulkActions = maxActionsPerRequest;
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
this.bulkVolume = minVolumePerRequest.getBytes();
|
||||
if (!isBulkMetricEnabled()) {
|
||||
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
||||
this.bulkVolume = maxVolumePerRequest.getBytes();
|
||||
}
|
||||
this.bulkRequest = new BulkRequest();
|
||||
this.closed = new AtomicBoolean(false);
|
||||
this.enabled = new AtomicBoolean(false);
|
||||
|
@ -178,8 +183,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
public synchronized void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
try {
|
||||
if (scheduledFuture != null) {
|
||||
scheduledFuture.cancel(true);
|
||||
if (flushIntervalFuture != null) {
|
||||
flushIntervalFuture.cancel(true);
|
||||
}
|
||||
// like flush but without ensuring open
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
|
|
|
@ -94,6 +94,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
if (settings.get("settings") != null && settings.get("mapping") != null) {
|
||||
setSettings(findSettingsFrom(settings.get("settings")));
|
||||
setMappings(findMappingsFrom(settings.get("mapping")));
|
||||
}
|
||||
boolean shift = settings.getAsBoolean("shift", false);
|
||||
setShift(shift);
|
||||
if (shift) {
|
||||
|
@ -116,7 +117,6 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIndex(String index) {
|
||||
|
|
|
@ -26,12 +26,12 @@ public enum Parameters {
|
|||
|
||||
BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"),
|
||||
|
||||
BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
|
||||
|
||||
BULK_METRIC_ENABLED("bulk.metric.enabled", Boolean.class, Boolean.FALSE),
|
||||
BULK_METRIC_ENABLED("bulk.metric.enabled", Boolean.class, Boolean.TRUE),
|
||||
|
||||
BULK_METRIC_LOG_INTERVAL("bulk.metric.log_interval", String.class, "10s"),
|
||||
|
||||
BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
|
||||
|
||||
BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()),
|
||||
|
||||
BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1),
|
||||
|
|
|
@ -53,7 +53,9 @@ class BulkClientTest {
|
|||
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -75,7 +77,9 @@ class BulkClientTest {
|
|||
bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -118,7 +122,9 @@ class BulkClientTest {
|
|||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -46,7 +46,9 @@ class DuplicateIDTest {
|
|||
bulkClient.refreshIndex(indexDefinition);
|
||||
long hits = bulkClient.getSearchableDocs(indexDefinition);
|
||||
assertTrue(hits < ACTIONS);
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -49,7 +49,9 @@ class SearchTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -73,6 +75,7 @@ class SearchTest {
|
|||
idcount.incrementAndGet();
|
||||
});
|
||||
assertEquals(numactions, idcount.get());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(275, searchClient.getSearchMetric().getQueries().getCount());
|
||||
assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
|
@ -81,3 +84,4 @@ class SearchTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,8 +63,10 @@ class SmokeTest {
|
|||
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
|
||||
adminClient.updateReplicaLevel(indexDefinition);
|
||||
assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -52,7 +52,9 @@ class BulkClientTest {
|
|||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -76,7 +78,9 @@ class BulkClientTest {
|
|||
}
|
||||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -120,7 +124,9 @@ class BulkClientTest {
|
|||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -45,7 +45,9 @@ class DuplicateIDTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -52,7 +52,9 @@ class SearchTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
@ -69,9 +71,11 @@ class SearchTest {
|
|||
TimeValue.timeValueMillis(100), 579);
|
||||
long count = stream.count();
|
||||
assertEquals(numactions, count);
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream docs
|
||||
stream = searchClient.search(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
|
@ -80,9 +84,11 @@ class SearchTest {
|
|||
final AtomicInteger hitcount = new AtomicInteger();
|
||||
stream.forEach(hit -> hitcount.incrementAndGet());
|
||||
assertEquals(numactions, hitcount.get());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount());
|
||||
assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount());
|
||||
assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
}
|
||||
// test stream doc ids
|
||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
|
@ -90,6 +96,7 @@ class SearchTest {
|
|||
final AtomicInteger idcount = new AtomicInteger();
|
||||
ids.forEach(id -> idcount.incrementAndGet());
|
||||
assertEquals(numactions, idcount.get());
|
||||
if (searchClient.isSearchMetricEnabled()) {
|
||||
assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount());
|
||||
assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||
assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||
|
@ -98,3 +105,4 @@ class SearchTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,8 +66,10 @@ class SmokeTest {
|
|||
assertTrue(bulkClient.waitForResponses(30, TimeUnit.SECONDS));
|
||||
adminClient.updateReplicaLevel(indexDefinition);
|
||||
assertEquals(1, adminClient.getReplicaLevel(indexDefinition));
|
||||
if (bulkClient.getBulkProcessor().isBulkMetricEnabled()) {
|
||||
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
}
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
|
|
|
@ -163,8 +163,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
|
|||
.put(Parameters.PORT.getName(), port)
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW")
|
||||
.put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m")
|
||||
//.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
//.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE)
|
||||
.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.FALSE)
|
||||
.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.FALSE)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue