better bulk control, keep last bulk response times

This commit is contained in:
Jörg Prante 2021-04-14 16:53:23 +02:00
parent 3aa46a7f30
commit ed32b98383
10 changed files with 164 additions and 87 deletions

View file

@ -16,6 +16,8 @@ public interface BulkController extends Closeable, Flushable {
void inactivate();
BulkProcessor getBulkProcessor();
BulkMetric getBulkMetric();
Throwable getLastBulkError();

View file

@ -8,11 +8,17 @@ import java.util.concurrent.TimeUnit;
public interface BulkProcessor extends Closeable, Flushable {
void setBulkActions(int bulkActions);
int getBulkActions();
void setBulkSize(long bulkSize);
long getBulkSize();
BulkProcessor add(ActionRequest<?> request);
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
BulkListener getBulkListener();
}

View file

@ -46,10 +46,6 @@ public interface IndexDefinition {
boolean isEnabled();
IndexDefinition setIgnoreErrors(boolean ignoreErrors);
boolean ignoreErrors();
IndexDefinition setShift(boolean shift);
boolean isShiftEnabled();

View file

@ -283,18 +283,14 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
}
}
if (!indicesAliasesRequest.getAliasActions().isEmpty()) {
logger.debug("indices alias request = {}", indicesAliasesRequest.getAliasActions().toString());
IndicesAliasesResponse indicesAliasesResponse =
client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet();
logger.debug("response isAcknowledged = {}",
indicesAliasesResponse.isAcknowledged());
client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet();
}
return new SuccessIndexShiftResult(moveAliases, newAliases);
}
@Override
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
return indexDefinition.isPruneEnabled() ?
return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() ?
pruneIndex(indexDefinition.getIndex(),
indexDefinition.getFullIndexName(),
indexDefinition.getDateTimePattern(),
@ -324,8 +320,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
Matcher m = pattern.matcher(s);
if (m.matches() && m.group(1).equals(index) && !s.equals(protectedIndexName)) {
candidateIndices.add(s);
} else {
logger.info("not a candidate: " + s);
}
}
if (candidateIndices.isEmpty()) {

View file

@ -31,9 +31,9 @@ public class DefaultBulkController implements BulkController {
private BulkListener bulkListener;
private final long maxWaitTime;
private long maxWaitTime;
private final TimeUnit maxWaitTimeUnit;
private TimeUnit maxWaitTimeUnit;
private final AtomicBoolean active;
@ -41,23 +41,17 @@ public class DefaultBulkController implements BulkController {
this.bulkClient = bulkClient;
this.bulkMetric = new DefaultBulkMetric();
this.active = new AtomicBoolean(false);
this.maxWaitTime = 30L;
this.maxWaitTimeUnit = TimeUnit.SECONDS;
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public Throwable getLastBulkError() {
return bulkListener.getLastBulkError();
}
@Override
public void init(Settings settings) {
bulkMetric.init(settings);
String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(),
Parameters.MAX_WAIT_BULK_RESPONSE.getString());
TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr,
TimeValue.timeValueSeconds(30), "");
this.maxWaitTime = maxWaitTimeValue.seconds();
this.maxWaitTimeUnit = TimeUnit.SECONDS;
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(),
Parameters.MAX_ACTIONS_PER_REQUEST.getInteger());
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(),
@ -72,7 +66,10 @@ public class DefaultBulkController implements BulkController {
Parameters.ENABLE_BULK_LOGGING.getBoolean());
boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(),
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging, failOnBulkError);
int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(),
Parameters.RESPONSE_TIME_COUNT.getInteger());
this.bulkListener = new DefaultBulkListener(this, bulkMetric,
enableBulkLogging, failOnBulkError, responseTimeCount);
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests)
@ -81,17 +78,32 @@ public class DefaultBulkController implements BulkController {
.build();
this.active.set(true);
if (logger.isInfoEnabled()) {
logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
logger.info("bulk processor now active with maxWaitTime = {} maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {} " +
"bulk logging = {} fail on bulk error = {} " +
"logger debug = {} from settings = {}",
maxActionsPerRequest, maxConcurrentRequests,
maxWaitTimeStr, maxActionsPerRequest, maxConcurrentRequests,
flushIngestInterval, maxVolumePerRequest,
enableBulkLogging, failOnBulkError,
logger.isDebugEnabled(), settings.toDelimitedString(','));
}
}
@Override
public BulkProcessor getBulkProcessor() {
return bulkProcessor;
}
@Override
public BulkMetric getBulkMetric() {
return bulkMetric;
}
@Override
public Throwable getLastBulkError() {
return bulkListener != null ? bulkListener.getLastBulkError() : null;
}
@Override
public void inactivate() {
this.active.set(false);
@ -189,6 +201,7 @@ public class DefaultBulkController implements BulkController {
@Override
public void close() throws IOException {
bulkMetric.close();
flush();
bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit);
if (bulkProcessor != null) {

View file

@ -9,6 +9,10 @@ import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkListener;
import org.xbib.elx.api.BulkMetric;
import java.util.Arrays;
import java.util.LongSummaryStatistics;
import java.util.stream.LongStream;
public class DefaultBulkListener implements BulkListener {
private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName());
@ -21,16 +25,23 @@ public class DefaultBulkListener implements BulkListener {
private final boolean failOnError;
private Throwable lastBulkError = null;
private Throwable lastBulkError;
private final int responseTimeCount;
private final LastResponseTimes responseTimes;
public DefaultBulkListener(BulkController bulkController,
BulkMetric bulkMetric,
boolean isBulkLoggingEnabled,
boolean failOnError) {
boolean failOnError,
int responseTimeCount) {
this.bulkController = bulkController;
this.bulkMetric = bulkMetric;
this.isBulkLoggingEnabled = isBulkLoggingEnabled;
this.failOnError = failOnError;
this.responseTimeCount = responseTimeCount;
this.responseTimes = new LastResponseTimes(responseTimeCount);
}
@Override
@ -55,6 +66,16 @@ public class DefaultBulkListener implements BulkListener {
long l = bulkMetric.getCurrentIngest().getCount();
bulkMetric.getCurrentIngest().dec();
bulkMetric.getSucceeded().inc(response.getItems().length);
if (responseTimeCount > 0 && responseTimes.add(response.getTook().millis()) == 0) {
LongSummaryStatistics stat = responseTimes.longStream().summaryStatistics();
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
logger.debug("bulk response millis: avg = " + stat.getAverage() +
" min =" + stat.getMin() +
" max = " + stat.getMax() +
" actions = " + bulkController.getBulkProcessor().getBulkActions() +
" size = " + bulkController.getBulkProcessor().getBulkSize());
}
}
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
@ -65,7 +86,7 @@ public class DefaultBulkListener implements BulkListener {
}
}
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]",
executionId,
bulkMetric.getSucceeded().getCount(),
bulkMetric.getFailed().getCount(),
@ -100,4 +121,30 @@ public class DefaultBulkListener implements BulkListener {
public Throwable getLastBulkError() {
return lastBulkError;
}
private static class LastResponseTimes {
private final Long[] values;
private final int limit;
private int index;
public LastResponseTimes(int limit) {
this.values = new Long[limit];
Arrays.fill(values, -1L);
this.limit = limit;
this.index = 0;
}
public int add(Long value) {
int i = index++ % limit;
values[i] = value;
return i;
}
public LongStream longStream() {
return Arrays.stream(values).filter(v -> v != -1L).mapToLong(Long::longValue);
}
}
}

View file

@ -33,12 +33,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class DefaultBulkProcessor implements BulkProcessor {
private final BulkListener bulkListener;
private final int bulkActions;
private final long bulkSize;
private final ScheduledThreadPoolExecutor scheduler;
private final ScheduledFuture<?> scheduledFuture;
@ -49,6 +43,10 @@ public class DefaultBulkProcessor implements BulkProcessor {
private BulkRequest bulkRequest;
private long bulkSize;
private int bulkActions;
private volatile boolean closed;
private DefaultBulkProcessor(ElasticsearchClient client,
@ -58,7 +56,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
int bulkActions,
ByteSizeValue bulkSize,
TimeValue flushInterval) {
this.bulkListener = bulkListener;
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
@ -86,8 +83,23 @@ public class DefaultBulkProcessor implements BulkProcessor {
}
@Override
public BulkListener getBulkListener() {
return bulkListener;
public void setBulkActions(int bulkActions) {
this.bulkActions = bulkActions;
}
@Override
public int getBulkActions() {
return bulkActions;
}
@Override
public void setBulkSize(long bulkSize) {
this.bulkSize = bulkSize;
}
@Override
public long getBulkSize() {
return bulkSize;
}
/**

View file

@ -1,6 +1,7 @@
package org.xbib.elx.common;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -14,7 +15,6 @@ import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.MalformedInputException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@ -41,8 +41,6 @@ public class DefaultIndexDefinition implements IndexDefinition {
private boolean enabled;
private boolean ignoreErrors;
private boolean shift;
private boolean prune;
@ -66,41 +64,55 @@ public class DefaultIndexDefinition implements IndexDefinition {
setType(type);
setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()));
setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$"));
setFullIndexName(index + getDateTimeFormatter().format(LocalDate.now()));
setEnabled(true);
setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now()));
setMaxWaitTime(Parameters.MAX_WAIT_BULK_RESPONSE_SECONDS.getInteger(), TimeUnit.SECONDS);
setShift(false);
setPrune(false);
setEnabled(true);
}
public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings)
throws IOException {
boolean isEnabled = settings.getAsBoolean("enabled", true);
TimeValue timeValue = settings.getAsTime(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), TimeValue.timeValueSeconds(30));
setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS);
String indexName = settings.get("name", index);
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$");
Pattern dateTimePattern = Pattern.compile(dateTimePatternStr);
String dateTimeFormat = settings.get("dateTimeFormat", "yyyyMMdd");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat)
.withZone(ZoneId.systemDefault());
String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now());
String fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName);
IndexRetention indexRetention = new DefaultIndexRetention()
.setMinToKeep(settings.getAsInt("retention.mintokeep", 0))
.setDelta(settings.getAsInt("retention.delta", 0));
setEnabled(isEnabled)
.setIndex(indexName)
.setType(type)
.setFullIndexName(fullIndexName)
.setSettings(findSettingsFrom(settings.get("settings")))
.setMappings(findMappingsFrom(settings.get("mapping")))
.setDateTimeFormatter(dateTimeFormatter)
.setDateTimePattern(dateTimePattern)
.setIgnoreErrors(settings.getAsBoolean("skiperrors", false))
.setShift(settings.getAsBoolean("shift", false))
.setPrune(settings.getAsBoolean("prune", false))
.setReplicaLevel(settings.getAsInt("replica", 0))
.setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS)
.setRetention(indexRetention)
.setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1))
.setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1));
String indexType = settings.get("type", type);
boolean enabled = settings.getAsBoolean("enabled", true);
setIndex(indexName);
setType(indexType);
setEnabled(enabled);
String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName);
setFullIndexName(fullIndexName);
if (settings.get("settings") != null && settings.get("mapping") != null) {
setSettings(findSettingsFrom(settings.get("settings")));
setMappings(findMappingsFrom(settings.get("mapping")));
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1));
setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1));
setReplicaLevel(settings.getAsInt("replica", 0));
boolean shift = settings.getAsBoolean("shift", false);
setShift(shift);
if (shift) {
String dateTimeFormat = settings.get(Parameters.DATE_TIME_FORMAT.getName(),
Parameters.DATE_TIME_FORMAT.getString());
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault())
.withZone(ZoneId.systemDefault());
setDateTimeFormatter(dateTimeFormatter);
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$");
Pattern dateTimePattern = Pattern.compile(dateTimePatternStr);
setDateTimePattern(dateTimePattern);
String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now());
fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName);
setFullIndexName(fullIndexName);
boolean prune = settings.getAsBoolean("prune", false);
setPrune(prune);
if (prune) {
IndexRetention indexRetention = new DefaultIndexRetention()
.setMinToKeep(settings.getAsInt("retention.mintokeep", 0))
.setDelta(settings.getAsInt("retention.delta", 0));
setRetention(indexRetention);
}
}
}
}
@Override
@ -214,17 +226,6 @@ public class DefaultIndexDefinition implements IndexDefinition {
return enabled;
}
@Override
public IndexDefinition setIgnoreErrors(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
return this;
}
@Override
public boolean ignoreErrors() {
return ignoreErrors;
}
@Override
public IndexDefinition setShift(boolean shift) {
this.shift = shift;

View file

@ -2,6 +2,10 @@ package org.xbib.elx.common;
public enum Parameters {
DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"),
MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"),
MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30),
START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0),
@ -14,6 +18,8 @@ public enum Parameters {
MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000),
RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 64),
// 0 = 1 CPU, synchronous requests, &gt; 0 = n + 1 CPUs, asynchronous requests
MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1),

View file

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 2.2.1.34
version = 2.2.1.35
gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0