diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java index 539f01b..7d76416 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java @@ -16,6 +16,8 @@ public interface BulkController extends Closeable, Flushable { void inactivate(); + BulkProcessor getBulkProcessor(); + BulkMetric getBulkMetric(); Throwable getLastBulkError(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index 833c31f..992d90a 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -8,11 +8,17 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { + void setBulkActions(int bulkActions); + + int getBulkActions(); + + void setBulkSize(long bulkSize); + + long getBulkSize(); + BulkProcessor add(ActionRequest request); boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; - - BulkListener getBulkListener(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index c207311..00946b8 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -46,10 +46,6 @@ public interface IndexDefinition { boolean isEnabled(); - IndexDefinition setIgnoreErrors(boolean ignoreErrors); - - boolean ignoreErrors(); - IndexDefinition setShift(boolean shift); boolean isShiftEnabled(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 7a35f9b..d8db967 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -283,18 +283,14 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } if (!indicesAliasesRequest.getAliasActions().isEmpty()) { - logger.debug("indices alias request = {}", indicesAliasesRequest.getAliasActions().toString()); - IndicesAliasesResponse indicesAliasesResponse = - client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet(); - logger.debug("response isAcknowledged = {}", - indicesAliasesResponse.isAcknowledged()); + client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet(); } return new SuccessIndexShiftResult(moveAliases, newAliases); } @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { - return indexDefinition.isPruneEnabled() ? + return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() ? pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), indexDefinition.getDateTimePattern(), @@ -324,8 +320,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements Matcher m = pattern.matcher(s); if (m.matches() && m.group(1).equals(index) && !s.equals(protectedIndexName)) { candidateIndices.add(s); - } else { - logger.info("not a candidate: " + s); } } if (candidateIndices.isEmpty()) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index d487ba5..a4d1d8d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -31,9 +31,9 @@ public class DefaultBulkController implements BulkController { private BulkListener bulkListener; - private final long maxWaitTime; + private long maxWaitTime; - private final TimeUnit maxWaitTimeUnit; + private TimeUnit maxWaitTimeUnit; private final AtomicBoolean active; @@ -41,23 +41,17 @@ public class DefaultBulkController implements BulkController { this.bulkClient = bulkClient; this.bulkMetric = new DefaultBulkMetric(); this.active = new AtomicBoolean(false); - this.maxWaitTime = 30L; - this.maxWaitTimeUnit = TimeUnit.SECONDS; - } - - @Override - public BulkMetric getBulkMetric() { - return bulkMetric; - } - - @Override - public Throwable getLastBulkError() { - return bulkListener.getLastBulkError(); } @Override public void init(Settings settings) { bulkMetric.init(settings); + String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), + Parameters.MAX_WAIT_BULK_RESPONSE.getString()); + TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr, + TimeValue.timeValueSeconds(30), ""); + this.maxWaitTime = maxWaitTimeValue.seconds(); + this.maxWaitTimeUnit = TimeUnit.SECONDS; int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), @@ -72,7 +66,10 @@ public class DefaultBulkController implements BulkController { Parameters.ENABLE_BULK_LOGGING.getBoolean()); boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), Parameters.FAIL_ON_BULK_ERROR.getBoolean()); - this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging, failOnBulkError); + int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(), + Parameters.RESPONSE_TIME_COUNT.getInteger()); + this.bulkListener = new DefaultBulkListener(this, bulkMetric, + enableBulkLogging, failOnBulkError, responseTimeCount); this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) @@ -81,17 +78,32 @@ public class DefaultBulkController implements BulkController { .build(); this.active.set(true); if (logger.isInfoEnabled()) { - logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + + logger.info("bulk processor now active with maxWaitTime = {} maxActionsPerRequest = {} maxConcurrentRequests = {} " + "flushIngestInterval = {} maxVolumePerRequest = {} " + "bulk logging = {} fail on bulk error = {} " + "logger debug = {} from settings = {}", - maxActionsPerRequest, maxConcurrentRequests, + maxWaitTimeStr, maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest, enableBulkLogging, failOnBulkError, logger.isDebugEnabled(), settings.toDelimitedString(',')); } } + @Override + public BulkProcessor getBulkProcessor() { + return bulkProcessor; + } + + @Override + public BulkMetric getBulkMetric() { + return bulkMetric; + } + + @Override + public Throwable getLastBulkError() { + return bulkListener != null ? bulkListener.getLastBulkError() : null; + } + @Override public void inactivate() { this.active.set(false); @@ -189,6 +201,7 @@ public class DefaultBulkController implements BulkController { @Override public void close() throws IOException { + bulkMetric.close(); flush(); bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit); if (bulkProcessor != null) { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 2d25d8f..686a031 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -9,6 +9,10 @@ import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; +import java.util.Arrays; +import java.util.LongSummaryStatistics; +import java.util.stream.LongStream; + public class DefaultBulkListener implements BulkListener { private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName()); @@ -21,16 +25,23 @@ public class DefaultBulkListener implements BulkListener { private final boolean failOnError; - private Throwable lastBulkError = null; + private Throwable lastBulkError; + + private final int responseTimeCount; + + private final LastResponseTimes responseTimes; public DefaultBulkListener(BulkController bulkController, BulkMetric bulkMetric, boolean isBulkLoggingEnabled, - boolean failOnError) { + boolean failOnError, + int responseTimeCount) { this.bulkController = bulkController; this.bulkMetric = bulkMetric; this.isBulkLoggingEnabled = isBulkLoggingEnabled; this.failOnError = failOnError; + this.responseTimeCount = responseTimeCount; + this.responseTimes = new LastResponseTimes(responseTimeCount); } @Override @@ -55,6 +66,16 @@ public class DefaultBulkListener implements BulkListener { long l = bulkMetric.getCurrentIngest().getCount(); bulkMetric.getCurrentIngest().dec(); bulkMetric.getSucceeded().inc(response.getItems().length); + if (responseTimeCount > 0 && responseTimes.add(response.getTook().millis()) == 0) { + LongSummaryStatistics stat = responseTimes.longStream().summaryStatistics(); + if (isBulkLoggingEnabled && logger.isDebugEnabled()) { + logger.debug("bulk response millis: avg = " + stat.getAverage() + + " min =" + stat.getMin() + + " max = " + stat.getMax() + + " actions = " + bulkController.getBulkProcessor().getBulkActions() + + " size = " + bulkController.getBulkProcessor().getBulkSize()); + } + } int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); @@ -65,7 +86,7 @@ public class DefaultBulkListener implements BulkListener { } } if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]", executionId, bulkMetric.getSucceeded().getCount(), bulkMetric.getFailed().getCount(), @@ -100,4 +121,30 @@ public class DefaultBulkListener implements BulkListener { public Throwable getLastBulkError() { return lastBulkError; } + + private static class LastResponseTimes { + + private final Long[] values; + + private final int limit; + + private int index; + + public LastResponseTimes(int limit) { + this.values = new Long[limit]; + Arrays.fill(values, -1L); + this.limit = limit; + this.index = 0; + } + + public int add(Long value) { + int i = index++ % limit; + values[i] = value; + return i; + } + + public LongStream longStream() { + return Arrays.stream(values).filter(v -> v != -1L).mapToLong(Long::longValue); + } + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 60267e9..d326023 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -33,12 +33,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class DefaultBulkProcessor implements BulkProcessor { - private final BulkListener bulkListener; - - private final int bulkActions; - - private final long bulkSize; - private final ScheduledThreadPoolExecutor scheduler; private final ScheduledFuture scheduledFuture; @@ -49,6 +43,10 @@ public class DefaultBulkProcessor implements BulkProcessor { private BulkRequest bulkRequest; + private long bulkSize; + + private int bulkActions; + private volatile boolean closed; private DefaultBulkProcessor(ElasticsearchClient client, @@ -58,7 +56,6 @@ public class DefaultBulkProcessor implements BulkProcessor { int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { - this.bulkListener = bulkListener; this.executionIdGen = new AtomicLong(); this.closed = false; this.bulkActions = bulkActions; @@ -86,8 +83,23 @@ public class DefaultBulkProcessor implements BulkProcessor { } @Override - public BulkListener getBulkListener() { - return bulkListener; + public void setBulkActions(int bulkActions) { + this.bulkActions = bulkActions; + } + + @Override + public int getBulkActions() { + return bulkActions; + } + + @Override + public void setBulkSize(long bulkSize) { + this.bulkSize = bulkSize; + } + + @Override + public long getBulkSize() { + return bulkSize; } /** diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index d6c825f..352b38c 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -1,6 +1,7 @@ package org.xbib.elx.common; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -14,7 +15,6 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.MalformedInputException; -import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -41,8 +41,6 @@ public class DefaultIndexDefinition implements IndexDefinition { private boolean enabled; - private boolean ignoreErrors; - private boolean shift; private boolean prune; @@ -66,41 +64,55 @@ public class DefaultIndexDefinition implements IndexDefinition { setType(type); setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); - setFullIndexName(index + getDateTimeFormatter().format(LocalDate.now())); - setEnabled(true); + setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); setMaxWaitTime(Parameters.MAX_WAIT_BULK_RESPONSE_SECONDS.getInteger(), TimeUnit.SECONDS); + setShift(false); + setPrune(false); + setEnabled(true); } public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) throws IOException { - boolean isEnabled = settings.getAsBoolean("enabled", true); + TimeValue timeValue = settings.getAsTime(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), TimeValue.timeValueSeconds(30)); + setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS); String indexName = settings.get("name", index); - String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); - Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); - String dateTimeFormat = settings.get("dateTimeFormat", "yyyyMMdd"); - DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat) - .withZone(ZoneId.systemDefault()); - String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); - String fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); - IndexRetention indexRetention = new DefaultIndexRetention() - .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) - .setDelta(settings.getAsInt("retention.delta", 0)); - setEnabled(isEnabled) - .setIndex(indexName) - .setType(type) - .setFullIndexName(fullIndexName) - .setSettings(findSettingsFrom(settings.get("settings"))) - .setMappings(findMappingsFrom(settings.get("mapping"))) - .setDateTimeFormatter(dateTimeFormatter) - .setDateTimePattern(dateTimePattern) - .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) - .setShift(settings.getAsBoolean("shift", false)) - .setPrune(settings.getAsBoolean("prune", false)) - .setReplicaLevel(settings.getAsInt("replica", 0)) - .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) - .setRetention(indexRetention) - .setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1)) - .setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1)); + String indexType = settings.get("type", type); + boolean enabled = settings.getAsBoolean("enabled", true); + setIndex(indexName); + setType(indexType); + setEnabled(enabled); + String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); + setFullIndexName(fullIndexName); + if (settings.get("settings") != null && settings.get("mapping") != null) { + setSettings(findSettingsFrom(settings.get("settings"))); + setMappings(findMappingsFrom(settings.get("mapping"))); + setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1)); + setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1)); + setReplicaLevel(settings.getAsInt("replica", 0)); + boolean shift = settings.getAsBoolean("shift", false); + setShift(shift); + if (shift) { + String dateTimeFormat = settings.get(Parameters.DATE_TIME_FORMAT.getName(), + Parameters.DATE_TIME_FORMAT.getString()); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault()) + .withZone(ZoneId.systemDefault()); + setDateTimeFormatter(dateTimeFormatter); + String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); + Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); + setDateTimePattern(dateTimePattern); + String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); + fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); + setFullIndexName(fullIndexName); + boolean prune = settings.getAsBoolean("prune", false); + setPrune(prune); + if (prune) { + IndexRetention indexRetention = new DefaultIndexRetention() + .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) + .setDelta(settings.getAsInt("retention.delta", 0)); + setRetention(indexRetention); + } + } + } } @Override @@ -214,17 +226,6 @@ public class DefaultIndexDefinition implements IndexDefinition { return enabled; } - @Override - public IndexDefinition setIgnoreErrors(boolean ignoreErrors) { - this.ignoreErrors = ignoreErrors; - return this; - } - - @Override - public boolean ignoreErrors() { - return ignoreErrors; - } - @Override public IndexDefinition setShift(boolean shift) { this.shift = shift; diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index e042904..0dabaf9 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,6 +2,10 @@ package org.xbib.elx.common; public enum Parameters { + DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), + + MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"), + MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30), START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0), @@ -14,6 +18,8 @@ public enum Parameters { MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000), + RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 64), + // 0 = 1 CPU, synchronous requests, > 0 = n + 1 CPUs, asynchronous requests MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1), diff --git a/gradle.properties b/gradle.properties index 9fc741c..38494b7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.34 +version = 2.2.1.35 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0