prefix bulk for parameters, fine tuning of auto volume bulk indexing (ezb web with 72 MB/s)
This commit is contained in:
parent
a19f09ab1f
commit
45865d656d
12 changed files with 69 additions and 55 deletions
|
@ -305,15 +305,17 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
logger.info("before pruning: index = {} full index = {} delta = {} mintokeep = {} pattern = {}",
|
||||
index, protectedIndexName, delta, mintokeep, pattern);
|
||||
if (delta == 0 && mintokeep == 0) {
|
||||
logger.info("no candidates found, delta is 0 and mintokeep is 0");
|
||||
return new EmptyPruneResult();
|
||||
}
|
||||
if (index.equals(protectedIndexName)) {
|
||||
logger.info("no candidates found, only protected index name is given");
|
||||
return new EmptyPruneResult();
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
|
||||
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
|
||||
logger.info("before pruning: found total of {} indices", getIndexResponse.getIndices().length);
|
||||
logger.info("before pruning: protected = " + protectedIndexName + " found total of {} indices", getIndexResponse.getIndices().length);
|
||||
List<String> candidateIndices = new ArrayList<>();
|
||||
for (String s : getIndexResponse.getIndices()) {
|
||||
Matcher m = pattern.matcher(s);
|
||||
|
@ -322,6 +324,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
}
|
||||
}
|
||||
if (candidateIndices.isEmpty()) {
|
||||
logger.info("no candidates found");
|
||||
return new EmptyPruneResult();
|
||||
}
|
||||
if (mintokeep > 0 && candidateIndices.size() <= mintokeep) {
|
||||
|
|
|
@ -31,10 +31,10 @@ public class DefaultBulkListener implements BulkListener {
|
|||
ScheduledThreadPoolExecutor scheduler,
|
||||
Settings settings) {
|
||||
this.bulkProcessor = bulkProcessor;
|
||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(),
|
||||
Parameters.ENABLE_BULK_LOGGING.getBoolean());
|
||||
boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(),
|
||||
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
|
||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(),
|
||||
Parameters.BULK_LOGGING_ENABLED.getBoolean());
|
||||
boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(),
|
||||
Parameters.BULK_FAIL_ON_ERROR.getBoolean());
|
||||
this.isBulkLoggingEnabled = enableBulkLogging;
|
||||
this.failOnError = failOnBulkError;
|
||||
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings);
|
||||
|
|
|
@ -50,7 +50,7 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
|
||||
private final long minVolumePerRequest;
|
||||
|
||||
private final long maxVolumePerRequest;
|
||||
private long maxVolumePerRequest;
|
||||
|
||||
private Long started;
|
||||
|
||||
|
@ -66,10 +66,10 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
||||
Settings settings) {
|
||||
this.bulkProcessor = bulkProcessor;
|
||||
int ringBufferSize = settings.getAsInt(Parameters.RING_BUFFER_SIZE.getName(),
|
||||
Parameters.RING_BUFFER_SIZE.getInteger());
|
||||
String measureIntervalStr = settings.get(Parameters.MEASURE_INTERVAL.getName(),
|
||||
Parameters.MEASURE_INTERVAL.getString());
|
||||
int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(),
|
||||
Parameters.BULK_RING_BUFFER_SIZE.getInteger());
|
||||
String measureIntervalStr = settings.get(Parameters.BULK_MEASURE_INTERVAL.getName(),
|
||||
Parameters.BULK_MEASURE_INTERVAL.getString());
|
||||
TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr,
|
||||
TimeValue.timeValueSeconds(1), "");
|
||||
this.measureIntervalSeconds = measureInterval.seconds();
|
||||
|
@ -82,15 +82,15 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
this.submitted = new CountMetric();
|
||||
this.succeeded = new CountMetric();
|
||||
this.failed = new CountMetric();
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
this.minVolumePerRequest = minVolumePerRequest.bytes();
|
||||
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
||||
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
||||
this.maxVolumePerRequest = maxVolumePerRequest.bytes();
|
||||
this.currentVolumePerRequest = minVolumePerRequest.bytes();
|
||||
String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(),
|
||||
Parameters.METRIC_LOG_INTERVAL.getString());
|
||||
String metricLogIntervalStr = settings.get(Parameters.BULK_METRIC_LOG_INTERVAL.getName(),
|
||||
Parameters.BULK_METRIC_LOG_INTERVAL.getString());
|
||||
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||
TimeValue.timeValueSeconds(10), "");
|
||||
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
|
@ -183,22 +183,31 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
" deltapercent = " + deltaPercent +
|
||||
" vol = " + currentVolumePerRequest);
|
||||
}
|
||||
if ((lastThroughput == null || throughput < 100000) && stat1.getMax() < 5000) {
|
||||
if ((lastThroughput == null || throughput < 1000000) && stat1.getAverage() < 5000) {
|
||||
double k = 0.5;
|
||||
double d = (1 / (1 + Math.exp(-(((double)x)) * k)));
|
||||
currentVolumePerRequest += d * currentVolumePerRequest;
|
||||
if (currentVolumePerRequest > maxVolumePerRequest) {
|
||||
currentVolumePerRequest = maxVolumePerRequest;
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("increasing volume to " + currentVolumePerRequest + " max volume = " + maxVolumePerRequest);
|
||||
}
|
||||
}
|
||||
bulkProcessor.setMaxBulkVolume(currentVolumePerRequest);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("metric: increase volume to " + currentVolumePerRequest);
|
||||
} else if (stat1.getAverage() >= 5000) {
|
||||
if (currentVolumePerRequest == maxVolumePerRequest) {
|
||||
// subtract 10% from max
|
||||
this.maxVolumePerRequest -= (maxVolumePerRequest / 10);
|
||||
if (maxVolumePerRequest < 1024) {
|
||||
maxVolumePerRequest = 1024;
|
||||
}
|
||||
}
|
||||
} else if (deltaPercent > 10 || stat1.getMax() >= 5000) {
|
||||
// fall back to minimal volume
|
||||
currentVolumePerRequest = minVolumePerRequest;
|
||||
bulkProcessor.setMaxBulkVolume(currentVolumePerRequest);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("metric: decrease volume to " + currentVolumePerRequest);
|
||||
logger.debug("decreasing volume to " + currentVolumePerRequest + " new max volume = " + maxVolumePerRequest);
|
||||
}
|
||||
}
|
||||
lastThroughput = throughput;
|
||||
|
|
|
@ -60,14 +60,14 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
|
||||
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
|
||||
this.bulkClient = bulkClient;
|
||||
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(),
|
||||
Parameters.MAX_ACTIONS_PER_REQUEST.getInteger());
|
||||
String flushIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(),
|
||||
Parameters.FLUSH_INTERVAL.getString());
|
||||
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
|
||||
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
|
||||
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
|
||||
Parameters.BULK_FLUSH_INTERVAL.getString());
|
||||
TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr,
|
||||
TimeValue.timeValueSeconds(30), "");
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
this.client = bulkClient.getClient();
|
||||
if (flushInterval.millis() > 0L) {
|
||||
this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
||||
|
@ -80,7 +80,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
this.closed = new AtomicBoolean(false);
|
||||
this.enabled = new AtomicBoolean(false);
|
||||
this.executionIdGen = new AtomicLong();
|
||||
this.permits = settings.getAsInt(Parameters.PERMITS.getName(), Parameters.PERMITS.getInteger());
|
||||
this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger());
|
||||
if (permits < 1) {
|
||||
throw new IllegalArgumentException("must not be less 1 permits for bulk indexing");
|
||||
}
|
||||
|
|
|
@ -73,8 +73,8 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
|
||||
public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings)
|
||||
throws IOException {
|
||||
String timeValueStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(),
|
||||
Parameters.MAX_WAIT_BULK_RESPONSE.getString());
|
||||
String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(),
|
||||
Parameters.BULK_MAX_WAIT_RESPONSE.getString());
|
||||
TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), "");
|
||||
setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS);
|
||||
String indexName = settings.get("name", index);
|
||||
|
@ -85,10 +85,10 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
setEnabled(enabled);
|
||||
String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName);
|
||||
setFullIndexName(fullIndexName);
|
||||
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(),
|
||||
Parameters.START_BULK_REFRESH_SECONDS.getInteger()));
|
||||
setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(),
|
||||
Parameters.STOP_BULK_REFRESH_SECONDS.getInteger()));
|
||||
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(),
|
||||
Parameters.BULK_START_REFRESH_SECONDS.getInteger()));
|
||||
setStopBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_STOP_REFRESH_SECONDS.getName(),
|
||||
Parameters.BULK_STOP_REFRESH_SECONDS.getInteger()));
|
||||
if (settings.get("settings") != null && settings.get("mapping") != null) {
|
||||
setSettings(findSettingsFrom(settings.get("settings")));
|
||||
setMappings(findMappingsFrom(settings.get("mapping")));
|
||||
|
@ -101,7 +101,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault())
|
||||
.withZone(ZoneId.systemDefault());
|
||||
setDateTimeFormatter(dateTimeFormatter);
|
||||
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$");
|
||||
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$");
|
||||
Pattern dateTimePattern = Pattern.compile(dateTimePatternStr);
|
||||
setDateTimePattern(dateTimePattern);
|
||||
String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now());
|
||||
|
|
|
@ -46,8 +46,8 @@ public class DefaultSearchMetric implements SearchMetric {
|
|||
emptyQueries = new CountMetric();
|
||||
failedQueries = new CountMetric();
|
||||
timeoutQueries = new CountMetric();
|
||||
String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(),
|
||||
Parameters.METRIC_LOG_INTERVAL.getString());
|
||||
String metricLogIntervalStr = settings.get(Parameters.SEARCH_METRIC_LOG_INTERVAL.getName(),
|
||||
Parameters.SEARCH_METRIC_LOG_INTERVAL.getString());
|
||||
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||
TimeValue.timeValueSeconds(10), "");
|
||||
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
|
|
|
@ -4,31 +4,33 @@ public enum Parameters {
|
|||
|
||||
DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"),
|
||||
|
||||
MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"),
|
||||
BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"),
|
||||
|
||||
START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1),
|
||||
BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1),
|
||||
|
||||
STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),
|
||||
BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),
|
||||
|
||||
ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true),
|
||||
BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true),
|
||||
|
||||
FAIL_ON_BULK_ERROR("bulk.fail_on_error", Boolean.class, true),
|
||||
BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true),
|
||||
|
||||
MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1),
|
||||
BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1),
|
||||
|
||||
MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"),
|
||||
BULK_MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"),
|
||||
|
||||
MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "5m"),
|
||||
BULK_MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1m"),
|
||||
|
||||
FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"),
|
||||
BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"),
|
||||
|
||||
MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
|
||||
BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
|
||||
|
||||
METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"),
|
||||
BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"),
|
||||
|
||||
RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()),
|
||||
BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()),
|
||||
|
||||
PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1);
|
||||
BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1),
|
||||
|
||||
SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s");
|
||||
|
||||
private final String name;
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ class DuplicateIDTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
|
|
@ -44,7 +44,7 @@ class SearchTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client)
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.startBulk(indexDefinition);
|
||||
|
|
|
@ -36,7 +36,7 @@ class DuplicateIDTest {
|
|||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
|
|
@ -46,7 +46,7 @@ class SearchTest {
|
|||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.startBulk(indexDefinition);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
group = org.xbib
|
||||
name = elx
|
||||
version = 2.2.1.39
|
||||
version = 2.2.1.41
|
||||
|
||||
gradle.wrapper.version = 6.6.1
|
||||
xbib-metrics.version = 2.1.0
|
||||
|
|
Loading…
Reference in a new issue