diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index f188f83..7d4c094 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -305,15 +305,17 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements logger.info("before pruning: index = {} full index = {} delta = {} mintokeep = {} pattern = {}", index, protectedIndexName, delta, mintokeep, pattern); if (delta == 0 && mintokeep == 0) { + logger.info("no candidates found, delta is 0 and mintokeep is 0"); return new EmptyPruneResult(); } if (index.equals(protectedIndexName)) { + logger.info("no candidates found, only protected index name is given"); return new EmptyPruneResult(); } ensureClientIsPresent(); GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); - logger.info("before pruning: found total of {} indices", getIndexResponse.getIndices().length); + logger.info("before pruning: protected = " + protectedIndexName + " found total of {} indices", getIndexResponse.getIndices().length); List candidateIndices = new ArrayList<>(); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); @@ -322,6 +324,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } if (candidateIndices.isEmpty()) { + logger.info("no candidates found"); return new EmptyPruneResult(); } if (mintokeep > 0 && candidateIndices.size() <= mintokeep) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 178f873..0d8ceeb 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -31,10 +31,10 @@ public class DefaultBulkListener implements BulkListener { ScheduledThreadPoolExecutor scheduler, Settings settings) { this.bulkProcessor = bulkProcessor; - boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), - Parameters.ENABLE_BULK_LOGGING.getBoolean()); - boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), - Parameters.FAIL_ON_BULK_ERROR.getBoolean()); + boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(), + Parameters.BULK_LOGGING_ENABLED.getBoolean()); + boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(), + Parameters.BULK_FAIL_ON_ERROR.getBoolean()); this.isBulkLoggingEnabled = enableBulkLogging; this.failOnError = failOnBulkError; this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index f983a76..da25255 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -50,7 +50,7 @@ public class DefaultBulkMetric implements BulkMetric { private final long minVolumePerRequest; - private final long maxVolumePerRequest; + private long maxVolumePerRequest; private Long started; @@ -66,10 +66,10 @@ public class DefaultBulkMetric implements BulkMetric { ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, Settings settings) { this.bulkProcessor = bulkProcessor; - int ringBufferSize = settings.getAsInt(Parameters.RING_BUFFER_SIZE.getName(), - Parameters.RING_BUFFER_SIZE.getInteger()); - String measureIntervalStr = settings.get(Parameters.MEASURE_INTERVAL.getName(), - Parameters.MEASURE_INTERVAL.getString()); + int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(), + Parameters.BULK_RING_BUFFER_SIZE.getInteger()); + String measureIntervalStr = settings.get(Parameters.BULK_MEASURE_INTERVAL.getName(), + Parameters.BULK_MEASURE_INTERVAL.getString()); TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr, TimeValue.timeValueSeconds(1), ""); this.measureIntervalSeconds = measureInterval.seconds(); @@ -82,15 +82,15 @@ public class DefaultBulkMetric implements BulkMetric { this.submitted = new CountMetric(); this.succeeded = new CountMetric(); this.failed = new CountMetric(); - ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(), - ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k")); + ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k")); this.minVolumePerRequest = minVolumePerRequest.bytes(); - ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), - ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getString(), "1m")); this.maxVolumePerRequest = maxVolumePerRequest.bytes(); this.currentVolumePerRequest = minVolumePerRequest.bytes(); - String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(), - Parameters.METRIC_LOG_INTERVAL.getString()); + String metricLogIntervalStr = settings.get(Parameters.BULK_METRIC_LOG_INTERVAL.getName(), + Parameters.BULK_METRIC_LOG_INTERVAL.getString()); TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue.timeValueSeconds(10), ""); this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); @@ -183,22 +183,31 @@ public class DefaultBulkMetric implements BulkMetric { " deltapercent = " + deltaPercent + " vol = " + currentVolumePerRequest); } - if ((lastThroughput == null || throughput < 100000) && stat1.getMax() < 5000) { + if ((lastThroughput == null || throughput < 1000000) && stat1.getAverage() < 5000) { double k = 0.5; double d = (1 / (1 + Math.exp(-(((double)x)) * k))); currentVolumePerRequest += d * currentVolumePerRequest; if (currentVolumePerRequest > maxVolumePerRequest) { currentVolumePerRequest = maxVolumePerRequest; + } else { + if (logger.isDebugEnabled()) { + logger.debug("increasing volume to " + currentVolumePerRequest + " max volume = " + maxVolumePerRequest); + } } bulkProcessor.setMaxBulkVolume(currentVolumePerRequest); - if (logger.isDebugEnabled()) { - logger.debug("metric: increase volume to " + currentVolumePerRequest); + } else if (stat1.getAverage() >= 5000) { + if (currentVolumePerRequest == maxVolumePerRequest) { + // subtract 10% from max + this.maxVolumePerRequest -= (maxVolumePerRequest / 10); + if (maxVolumePerRequest < 1024) { + maxVolumePerRequest = 1024; + } } - } else if (deltaPercent > 10 || stat1.getMax() >= 5000) { + // fall back to minimal volume currentVolumePerRequest = minVolumePerRequest; bulkProcessor.setMaxBulkVolume(currentVolumePerRequest); if (logger.isDebugEnabled()) { - logger.debug("metric: decrease volume to " + currentVolumePerRequest); + logger.debug("decreasing volume to " + currentVolumePerRequest + " new max volume = " + maxVolumePerRequest); } } lastThroughput = throughput; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 1267381..0bc417b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -60,14 +60,14 @@ public class DefaultBulkProcessor implements BulkProcessor { public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) { this.bulkClient = bulkClient; - int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), - Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); - String flushIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), - Parameters.FLUSH_INTERVAL.getString()); + int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), + Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger()); + String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(), + Parameters.BULK_FLUSH_INTERVAL.getString()); TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr, TimeValue.timeValueSeconds(30), ""); - ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(), - ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k")); + ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k")); this.client = bulkClient.getClient(); if (flushInterval.millis() > 0L) { this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(), @@ -80,7 +80,7 @@ public class DefaultBulkProcessor implements BulkProcessor { this.closed = new AtomicBoolean(false); this.enabled = new AtomicBoolean(false); this.executionIdGen = new AtomicLong(); - this.permits = settings.getAsInt(Parameters.PERMITS.getName(), Parameters.PERMITS.getInteger()); + this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger()); if (permits < 1) { throw new IllegalArgumentException("must not be less 1 permits for bulk indexing"); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index ada8db9..8b3ce52 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -73,8 +73,8 @@ public class DefaultIndexDefinition implements IndexDefinition { public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) throws IOException { - String timeValueStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), - Parameters.MAX_WAIT_BULK_RESPONSE.getString()); + String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(), + Parameters.BULK_MAX_WAIT_RESPONSE.getString()); TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), ""); setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS); String indexName = settings.get("name", index); @@ -85,10 +85,10 @@ public class DefaultIndexDefinition implements IndexDefinition { setEnabled(enabled); String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); setFullIndexName(fullIndexName); - setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), - Parameters.START_BULK_REFRESH_SECONDS.getInteger())); - setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), - Parameters.STOP_BULK_REFRESH_SECONDS.getInteger())); + setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(), + Parameters.BULK_START_REFRESH_SECONDS.getInteger())); + setStopBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_STOP_REFRESH_SECONDS.getName(), + Parameters.BULK_STOP_REFRESH_SECONDS.getInteger())); if (settings.get("settings") != null && settings.get("mapping") != null) { setSettings(findSettingsFrom(settings.get("settings"))); setMappings(findMappingsFrom(settings.get("mapping"))); @@ -101,7 +101,7 @@ public class DefaultIndexDefinition implements IndexDefinition { DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault()) .withZone(ZoneId.systemDefault()); setDateTimeFormatter(dateTimeFormatter); - String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); + String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$"); Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); setDateTimePattern(dateTimePattern); String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java index 157d514..cde8c5f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultSearchMetric.java @@ -46,8 +46,8 @@ public class DefaultSearchMetric implements SearchMetric { emptyQueries = new CountMetric(); failedQueries = new CountMetric(); timeoutQueries = new CountMetric(); - String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(), - Parameters.METRIC_LOG_INTERVAL.getString()); + String metricLogIntervalStr = settings.get(Parameters.SEARCH_METRIC_LOG_INTERVAL.getName(), + Parameters.SEARCH_METRIC_LOG_INTERVAL.getString()); TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr, TimeValue.timeValueSeconds(10), ""); this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS); diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index af98693..1dd55bc 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -4,31 +4,33 @@ public enum Parameters { DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), - MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"), + BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"), - START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), + BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1), - STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), + BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), - ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true), + BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true), - FAIL_ON_BULK_ERROR("bulk.fail_on_error", Boolean.class, true), + BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true), - MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), + BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1), - MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"), + BULK_MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"), - MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "5m"), + BULK_MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1m"), - FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"), + BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"), - MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), + BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"), - METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"), + BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"), - RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()), + BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()), - PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1); + BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1), + + SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s"); private final String name; diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index fb0e255..e7dce75 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -38,7 +38,7 @@ class DuplicateIDTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index 2646979..bb62788 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -44,7 +44,7 @@ class SearchTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 36045d7..d7a3e5e 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -36,7 +36,7 @@ class DuplicateIDTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 384f681..9d36609 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -46,7 +46,7 @@ class SearchTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); diff --git a/gradle.properties b/gradle.properties index 883fea9..8687377 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.39 +version = 2.2.1.41 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0