clean up API, better metrics
This commit is contained in:
parent
6731ad986b
commit
a19f09ab1f
21 changed files with 720 additions and 135 deletions
|
@ -4,10 +4,13 @@ import org.elasticsearch.client.ElasticsearchClient;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface BasicClient extends Closeable {
|
public interface BasicClient extends Closeable {
|
||||||
|
|
||||||
|
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set an Elasticsearch client to extend from it. May be null for TransportClient.
|
* Set an Elasticsearch client to extend from it. May be null for TransportClient.
|
||||||
* @param client the Elasticsearch client
|
* @param client the Elasticsearch client
|
||||||
|
@ -59,4 +62,6 @@ public interface BasicClient extends Closeable {
|
||||||
long getSearchableDocs(IndexDefinition index);
|
long getSearchableDocs(IndexDefinition index);
|
||||||
|
|
||||||
boolean isIndexExists(IndexDefinition index);
|
boolean isIndexExists(IndexDefinition index);
|
||||||
|
|
||||||
|
ScheduledThreadPoolExecutor getScheduler();
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import org.elasticsearch.action.ActionRequest;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.Flushable;
|
import java.io.Flushable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface BulkProcessor extends Closeable, Flushable {
|
public interface BulkProcessor extends Closeable, Flushable {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
dependencies {
|
dependencies {
|
||||||
api project(':elx-api')
|
api project(':elx-api')
|
||||||
implementation "org.xbib:guice:${project.property('xbib-guice.version')}"
|
implementation "org.xbib:guice:${project.property('xbib-guice.version')}"
|
||||||
|
implementation "org.xbib:time:${project.property('xbib-time.version')}"
|
||||||
runtimeOnly "com.vividsolutions:jts:${project.property('jts.version')}"
|
runtimeOnly "com.vividsolutions:jts:${project.property('jts.version')}"
|
||||||
runtimeOnly "com.github.spullara.mustache.java:compiler:${project.property('mustache.version')}"
|
runtimeOnly "com.github.spullara.mustache.java:compiler:${project.property('mustache.version')}"
|
||||||
runtimeOnly "net.java.dev.jna:jna:${project.property('jna.version')}"
|
runtimeOnly "net.java.dev.jna:jna:${project.property('jna.version')}"
|
||||||
|
|
|
@ -9,7 +9,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
|
||||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
|
||||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
|
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
|
||||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
||||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
||||||
|
@ -83,7 +82,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, ?> getMapping(IndexDefinition indexDefinition) throws IOException {
|
public Map<String, ?> getMapping(IndexDefinition indexDefinition) throws IOException {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
||||||
|
@ -98,7 +97,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AdminClient deleteIndex(IndexDefinition indexDefinition) {
|
public AdminClient deleteIndex(IndexDefinition indexDefinition) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -115,7 +114,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
|
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (level < 1) {
|
if (level < 1) {
|
||||||
|
@ -131,7 +130,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getReplicaLevel(IndexDefinition indexDefinition) {
|
public int getReplicaLevel(IndexDefinition indexDefinition) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -203,7 +202,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
if (additionalAliases == null) {
|
if (additionalAliases == null) {
|
||||||
return new EmptyIndexShiftResult();
|
return new EmptyIndexShiftResult();
|
||||||
}
|
}
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return new EmptyIndexShiftResult();
|
return new EmptyIndexShiftResult();
|
||||||
}
|
}
|
||||||
if (indexDefinition.isShiftEnabled()) {
|
if (indexDefinition.isShiftEnabled()) {
|
||||||
|
@ -358,7 +357,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) {
|
public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return -1L;
|
return -1L;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -385,7 +384,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean forceMerge(IndexDefinition indexDefinition) {
|
public boolean forceMerge(IndexDefinition indexDefinition) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!indexDefinition.isForceMergeEnabled()) {
|
if (!indexDefinition.isForceMergeEnabled()) {
|
||||||
|
@ -425,7 +424,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkMapping(IndexDefinition indexDefinition) {
|
public void checkMapping(IndexDefinition indexDefinition) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
|
|
@ -7,6 +7,12 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
|
||||||
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
|
@ -22,10 +28,15 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPoolInfo;
|
||||||
import org.xbib.elx.api.BasicClient;
|
import org.xbib.elx.api.BasicClient;
|
||||||
import org.xbib.elx.api.IndexDefinition;
|
import org.xbib.elx.api.IndexDefinition;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -37,12 +48,23 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
|
|
||||||
protected Settings settings;
|
protected Settings settings;
|
||||||
|
|
||||||
|
private final ScheduledThreadPoolExecutor scheduler;
|
||||||
|
|
||||||
private final AtomicBoolean closed;
|
private final AtomicBoolean closed;
|
||||||
|
|
||||||
public AbstractBasicClient() {
|
public AbstractBasicClient() {
|
||||||
|
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2,
|
||||||
|
EsExecutors.daemonThreadFactory("elx-bulk-processor"));
|
||||||
|
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||||
|
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||||
closed = new AtomicBoolean(false);
|
closed = new AtomicBoolean(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledThreadPoolExecutor getScheduler() {
|
||||||
|
return scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setClient(ElasticsearchClient client) {
|
public void setClient(ElasticsearchClient client) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
@ -82,6 +104,39 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
|
||||||
|
ensureClientIsPresent();
|
||||||
|
if (key == null) {
|
||||||
|
throw new IOException("no key given");
|
||||||
|
}
|
||||||
|
if (value == null) {
|
||||||
|
throw new IOException("no value given");
|
||||||
|
}
|
||||||
|
Settings.Builder updateSettingsBuilder = Settings.builder();
|
||||||
|
updateSettingsBuilder.put(key, value.toString());
|
||||||
|
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
|
||||||
|
updateSettingsRequest.persistentSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
|
||||||
|
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Long getThreadPoolQueueSize(String name) {
|
||||||
|
ensureClientIsPresent();
|
||||||
|
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
|
||||||
|
nodesInfoRequest.threadPool(true);
|
||||||
|
NodesInfoResponse nodesInfoResponse =
|
||||||
|
client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet();
|
||||||
|
for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
|
||||||
|
ThreadPoolInfo threadPoolInfo = nodeInfo.getThreadPool();
|
||||||
|
for (ThreadPool.Info info : threadPoolInfo) {
|
||||||
|
if (info.getName().equals(name)) {
|
||||||
|
return info.getQueueSize().getSingles();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
|
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -139,7 +194,7 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getSearchableDocs(IndexDefinition indexDefinition) {
|
public long getSearchableDocs(IndexDefinition indexDefinition) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return -1L;
|
return -1L;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -166,6 +221,9 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (closed.compareAndSet(false, true)) {
|
||||||
closeClient(settings);
|
closeClient(settings);
|
||||||
|
if (scheduler != null) {
|
||||||
|
scheduler.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,12 +255,12 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean ensureIndexDefinition(IndexDefinition indexDefinition) {
|
protected boolean isIndexDefinitionDisabled(IndexDefinition indexDefinition) {
|
||||||
if (!indexDefinition.isEnabled()) {
|
if (!indexDefinition.isEnabled()) {
|
||||||
logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled");
|
logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled");
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) {
|
protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) {
|
||||||
|
|
|
@ -32,7 +32,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
private BulkProcessor bulkProcessor;
|
private BulkProcessor bulkProcessor;
|
||||||
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(true);
|
private final AtomicBoolean closed;
|
||||||
|
|
||||||
|
public AbstractBulkClient() {
|
||||||
|
super();
|
||||||
|
closed = new AtomicBoolean(true);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Settings settings) throws IOException {
|
public void init(Settings settings) throws IOException {
|
||||||
|
@ -59,16 +64,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (closed.compareAndSet(false, true)) {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
if (bulkProcessor != null) {
|
if (bulkProcessor != null) {
|
||||||
logger.info("closing bulk");
|
logger.info("closing bulk procesor");
|
||||||
bulkProcessor.close();
|
bulkProcessor.close();
|
||||||
}
|
}
|
||||||
closeClient(settings);
|
closeClient(settings);
|
||||||
|
super.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void newIndex(IndexDefinition indexDefinition) throws IOException {
|
public void newIndex(IndexDefinition indexDefinition) throws IOException {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String index = indexDefinition.getFullIndexName();
|
String index = indexDefinition.getFullIndexName();
|
||||||
|
@ -117,16 +123,25 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startBulk(IndexDefinition indexDefinition) throws IOException {
|
public void startBulk(IndexDefinition indexDefinition) throws IOException {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
Long bulkQueueSize = getThreadPoolQueueSize("bulk");
|
||||||
|
if (bulkQueueSize != null && bulkQueueSize <= 64) {
|
||||||
|
logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4);
|
||||||
|
bulkQueueSize = bulkQueueSize * 4;
|
||||||
|
} else {
|
||||||
|
logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256");
|
||||||
|
bulkQueueSize = 256L;
|
||||||
|
}
|
||||||
|
putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS);
|
||||||
bulkProcessor.startBulkMode(indexDefinition);
|
bulkProcessor.startBulkMode(indexDefinition);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stopBulk(IndexDefinition indexDefinition) throws IOException {
|
public void stopBulk(IndexDefinition indexDefinition) throws IOException {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -140,7 +155,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) {
|
public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
return index(new IndexRequest()
|
return index(new IndexRequest()
|
||||||
|
@ -158,7 +173,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkClient delete(IndexDefinition indexDefinition, String id) {
|
public BulkClient delete(IndexDefinition indexDefinition, String id) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
return delete(new DeleteRequest()
|
return delete(new DeleteRequest()
|
||||||
|
@ -181,7 +196,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) {
|
public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
return update(new UpdateRequest()
|
return update(new UpdateRequest()
|
||||||
|
@ -210,7 +225,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flushIndex(IndexDefinition indexDefinition) {
|
public void flushIndex(IndexDefinition indexDefinition) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
@ -219,7 +234,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refreshIndex(IndexDefinition indexDefinition) {
|
public void refreshIndex(IndexDefinition indexDefinition) {
|
||||||
if (!ensureIndexDefinition(indexDefinition)) {
|
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
|
|
|
@ -36,6 +36,13 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
|
|
||||||
private SearchMetric searchMetric;
|
private SearchMetric searchMetric;
|
||||||
|
|
||||||
|
private final AtomicBoolean closed;
|
||||||
|
|
||||||
|
public AbstractSearchClient() {
|
||||||
|
super();
|
||||||
|
this.closed = new AtomicBoolean(true);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SearchMetric getSearchMetric() {
|
public SearchMetric getSearchMetric() {
|
||||||
return searchMetric;
|
return searchMetric;
|
||||||
|
@ -43,16 +50,20 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Settings settings) throws IOException {
|
public void init(Settings settings) throws IOException {
|
||||||
super.init(settings);
|
if (closed.compareAndSet(true, false)) {
|
||||||
this.searchMetric = new DefaultSearchMetric();
|
super.init(settings);
|
||||||
searchMetric.init(settings);
|
this.searchMetric = new DefaultSearchMetric(getScheduler(), settings);
|
||||||
|
searchMetric.init(settings);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
super.close();
|
if (closed.compareAndSet(false, true)) {
|
||||||
if (searchMetric != null) {
|
if (searchMetric != null) {
|
||||||
searchMetric.close();
|
searchMetric.close();
|
||||||
|
}
|
||||||
|
super.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,12 +5,12 @@ import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.xbib.elx.api.BulkListener;
|
import org.xbib.elx.api.BulkListener;
|
||||||
import org.xbib.elx.api.BulkMetric;
|
import org.xbib.elx.api.BulkMetric;
|
||||||
import org.xbib.elx.api.BulkProcessor;
|
import org.xbib.elx.api.BulkProcessor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LongSummaryStatistics;
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
|
||||||
public class DefaultBulkListener implements BulkListener {
|
public class DefaultBulkListener implements BulkListener {
|
||||||
|
@ -27,15 +27,17 @@ public class DefaultBulkListener implements BulkListener {
|
||||||
|
|
||||||
private Throwable lastBulkError;
|
private Throwable lastBulkError;
|
||||||
|
|
||||||
public DefaultBulkListener(BulkProcessor bulkProcessor,
|
public DefaultBulkListener(DefaultBulkProcessor bulkProcessor,
|
||||||
ScheduledThreadPoolExecutor scheduler,
|
ScheduledThreadPoolExecutor scheduler,
|
||||||
boolean isBulkLoggingEnabled,
|
Settings settings) {
|
||||||
boolean failOnError,
|
|
||||||
int ringBufferSize) {
|
|
||||||
this.bulkProcessor = bulkProcessor;
|
this.bulkProcessor = bulkProcessor;
|
||||||
this.isBulkLoggingEnabled = isBulkLoggingEnabled;
|
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(),
|
||||||
this.failOnError = failOnError;
|
Parameters.ENABLE_BULK_LOGGING.getBoolean());
|
||||||
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, ringBufferSize);
|
boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(),
|
||||||
|
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
|
||||||
|
this.isBulkLoggingEnabled = enableBulkLogging;
|
||||||
|
this.failOnError = failOnBulkError;
|
||||||
|
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings);
|
||||||
bulkMetric.start();
|
bulkMetric.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,24 +1,32 @@
|
||||||
package org.xbib.elx.common;
|
package org.xbib.elx.common;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Level;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.xbib.elx.api.BulkMetric;
|
import org.xbib.elx.api.BulkMetric;
|
||||||
import org.xbib.elx.api.BulkProcessor;
|
|
||||||
import org.xbib.metrics.api.Count;
|
import org.xbib.metrics.api.Count;
|
||||||
import org.xbib.metrics.api.Metered;
|
import org.xbib.metrics.api.Metered;
|
||||||
import org.xbib.metrics.common.CountMetric;
|
import org.xbib.metrics.common.CountMetric;
|
||||||
import org.xbib.metrics.common.Meter;
|
import org.xbib.metrics.common.Meter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.LongSummaryStatistics;
|
import java.util.LongSummaryStatistics;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class DefaultBulkMetric implements BulkMetric {
|
public class DefaultBulkMetric implements BulkMetric {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(DefaultBulkMetric.class.getName());
|
private static final Logger logger = LogManager.getLogger(DefaultBulkMetric.class.getName());
|
||||||
|
|
||||||
private final BulkProcessor bulkProcessor;
|
private final DefaultBulkProcessor bulkProcessor;
|
||||||
|
|
||||||
|
private final ScheduledFuture<?> future;
|
||||||
|
|
||||||
private final Meter totalIngest;
|
private final Meter totalIngest;
|
||||||
|
|
||||||
|
@ -34,24 +42,37 @@ public class DefaultBulkMetric implements BulkMetric {
|
||||||
|
|
||||||
private final Count failed;
|
private final Count failed;
|
||||||
|
|
||||||
private Long started;
|
private final long measureIntervalSeconds;
|
||||||
|
|
||||||
private Long stopped;
|
|
||||||
|
|
||||||
private final int ringBufferSize;
|
private final int ringBufferSize;
|
||||||
|
|
||||||
private final LongRingBuffer ringBuffer;
|
private final LongRingBuffer ringBuffer;
|
||||||
|
|
||||||
|
private final long minVolumePerRequest;
|
||||||
|
|
||||||
|
private final long maxVolumePerRequest;
|
||||||
|
|
||||||
|
private Long started;
|
||||||
|
|
||||||
|
private Long stopped;
|
||||||
|
|
||||||
private Double lastThroughput;
|
private Double lastThroughput;
|
||||||
|
|
||||||
private long currentMaxVolume;
|
private long currentVolumePerRequest;
|
||||||
|
|
||||||
private int currentPermits;
|
private int x = 0;
|
||||||
|
|
||||||
public DefaultBulkMetric(BulkProcessor bulkProcessor,
|
public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor,
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
||||||
int ringBufferSize) {
|
Settings settings) {
|
||||||
this.bulkProcessor = bulkProcessor;
|
this.bulkProcessor = bulkProcessor;
|
||||||
|
int ringBufferSize = settings.getAsInt(Parameters.RING_BUFFER_SIZE.getName(),
|
||||||
|
Parameters.RING_BUFFER_SIZE.getInteger());
|
||||||
|
String measureIntervalStr = settings.get(Parameters.MEASURE_INTERVAL.getName(),
|
||||||
|
Parameters.MEASURE_INTERVAL.getString());
|
||||||
|
TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr,
|
||||||
|
TimeValue.timeValueSeconds(1), "");
|
||||||
|
this.measureIntervalSeconds = measureInterval.seconds();
|
||||||
this.totalIngest = new Meter(scheduledThreadPoolExecutor);
|
this.totalIngest = new Meter(scheduledThreadPoolExecutor);
|
||||||
this.ringBufferSize = ringBufferSize;
|
this.ringBufferSize = ringBufferSize;
|
||||||
this.ringBuffer = new LongRingBuffer(ringBufferSize);
|
this.ringBuffer = new LongRingBuffer(ringBufferSize);
|
||||||
|
@ -61,8 +82,18 @@ public class DefaultBulkMetric implements BulkMetric {
|
||||||
this.submitted = new CountMetric();
|
this.submitted = new CountMetric();
|
||||||
this.succeeded = new CountMetric();
|
this.succeeded = new CountMetric();
|
||||||
this.failed = new CountMetric();
|
this.failed = new CountMetric();
|
||||||
this.currentMaxVolume = 1024;
|
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(),
|
||||||
this.currentPermits = 1;
|
ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||||
|
this.minVolumePerRequest = minVolumePerRequest.bytes();
|
||||||
|
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(),
|
||||||
|
ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
||||||
|
this.maxVolumePerRequest = maxVolumePerRequest.bytes();
|
||||||
|
this.currentVolumePerRequest = minVolumePerRequest.bytes();
|
||||||
|
String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(),
|
||||||
|
Parameters.METRIC_LOG_INTERVAL.getString());
|
||||||
|
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||||
|
TimeValue.timeValueSeconds(10), "");
|
||||||
|
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -113,7 +144,7 @@ public class DefaultBulkMetric implements BulkMetric {
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
this.started = System.nanoTime();
|
this.started = System.nanoTime();
|
||||||
totalIngest.start(5L);
|
totalIngest.start(measureIntervalSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -123,13 +154,13 @@ public class DefaultBulkMetric implements BulkMetric {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() throws IOException {
|
||||||
stop();
|
stop();
|
||||||
totalIngest.shutdown();
|
totalIngest.shutdown();
|
||||||
|
log();
|
||||||
|
this.future.cancel(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int x = 0;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recalculate(BulkRequest request, BulkResponse response) {
|
public void recalculate(BulkRequest request, BulkResponse response) {
|
||||||
if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) {
|
if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) {
|
||||||
|
@ -138,8 +169,9 @@ public class DefaultBulkMetric implements BulkMetric {
|
||||||
LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics();
|
LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics();
|
||||||
double throughput = stat2.getAverage() / stat1.getAverage();
|
double throughput = stat2.getAverage() / stat1.getAverage();
|
||||||
double delta = lastThroughput != null ? throughput - lastThroughput : 0.0d;
|
double delta = lastThroughput != null ? throughput - lastThroughput : 0.0d;
|
||||||
|
double deltaPercent = delta * 100 / throughput;
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("metric: avg = " + stat1.getAverage() +
|
logger.debug("time: avg = " + stat1.getAverage() +
|
||||||
" min = " + stat1.getMin() +
|
" min = " + stat1.getMin() +
|
||||||
" max = " + stat1.getMax() +
|
" max = " + stat1.getMax() +
|
||||||
" size: avg = " + stat2.getAverage() +
|
" size: avg = " + stat2.getAverage() +
|
||||||
|
@ -148,35 +180,52 @@ public class DefaultBulkMetric implements BulkMetric {
|
||||||
" last throughput: " + lastThroughput + " bytes/ms" +
|
" last throughput: " + lastThroughput + " bytes/ms" +
|
||||||
" throughput: " + throughput + " bytes/ms" +
|
" throughput: " + throughput + " bytes/ms" +
|
||||||
" delta = " + delta +
|
" delta = " + delta +
|
||||||
" vol = " + currentMaxVolume);
|
" deltapercent = " + deltaPercent +
|
||||||
|
" vol = " + currentVolumePerRequest);
|
||||||
}
|
}
|
||||||
if (lastThroughput == null || throughput < 10000) {
|
if ((lastThroughput == null || throughput < 100000) && stat1.getMax() < 5000) {
|
||||||
double k = 0.5;
|
double k = 0.5;
|
||||||
double d = (1 / (1 + Math.exp(-(((double)x)) * k)));
|
double d = (1 / (1 + Math.exp(-(((double)x)) * k)));
|
||||||
logger.debug("inc: x = " + x + " d = " + d);
|
currentVolumePerRequest += d * currentVolumePerRequest;
|
||||||
currentMaxVolume += d * currentMaxVolume;
|
if (currentVolumePerRequest > maxVolumePerRequest) {
|
||||||
if (currentMaxVolume > 5 + 1024 * 1024) {
|
currentVolumePerRequest = maxVolumePerRequest;
|
||||||
currentMaxVolume = 5 * 1024 * 1024;
|
|
||||||
}
|
}
|
||||||
bulkProcessor.setMaxBulkVolume(currentMaxVolume);
|
bulkProcessor.setMaxBulkVolume(currentVolumePerRequest);
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("metric: increase volume to " + currentMaxVolume);
|
logger.debug("metric: increase volume to " + currentVolumePerRequest);
|
||||||
}
|
}
|
||||||
} else if (delta < -100) {
|
} else if (deltaPercent > 10 || stat1.getMax() >= 5000) {
|
||||||
double k = 0.5;
|
currentVolumePerRequest = minVolumePerRequest;
|
||||||
double d = (1 / (1 + Math.exp(-(((double)x)) * k)));
|
bulkProcessor.setMaxBulkVolume(currentVolumePerRequest);
|
||||||
d = -1/d;
|
|
||||||
logger.debug("dec: x = " + x + " d = " + d);
|
|
||||||
currentMaxVolume += d * currentMaxVolume;
|
|
||||||
if (currentMaxVolume > 5 + 1024 * 1024) {
|
|
||||||
currentMaxVolume = 5 * 1024 * 1024;
|
|
||||||
}
|
|
||||||
bulkProcessor.setMaxBulkVolume(currentMaxVolume);
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("metric: decrease volume to " + currentMaxVolume);
|
logger.debug("metric: decrease volume to " + currentVolumePerRequest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lastThroughput = throughput;
|
lastThroughput = throughput;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void log() {
|
||||||
|
long docs = getSucceeded().getCount();
|
||||||
|
long elapsed = elapsed() / 1000000; // nano to millis
|
||||||
|
double dps = docs * 1000.0 / elapsed;
|
||||||
|
long bytes = getTotalIngestSizeInBytes().getCount();
|
||||||
|
double avg = bytes / (docs + 1.0); // avoid div by zero
|
||||||
|
double bps = bytes * 1000.0 / elapsed;
|
||||||
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.log(Level.INFO, "{} docs, {} ms = {}, {} = {}, {} = {} avg, {} = {}, {} = {}",
|
||||||
|
docs,
|
||||||
|
elapsed,
|
||||||
|
FormatUtil.formatDurationWords(elapsed, true, true),
|
||||||
|
bytes,
|
||||||
|
FormatUtil.formatSize(bytes),
|
||||||
|
avg,
|
||||||
|
FormatUtil.formatSize(avg),
|
||||||
|
dps,
|
||||||
|
FormatUtil.formatDocumentSpeed(dps),
|
||||||
|
bps,
|
||||||
|
FormatUtil.formatSpeed(bps)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,16 +11,13 @@ import org.elasticsearch.client.ElasticsearchClient;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
||||||
import org.xbib.elx.api.BulkClient;
|
import org.xbib.elx.api.BulkClient;
|
||||||
import org.xbib.elx.api.BulkMetric;
|
import org.xbib.elx.api.BulkMetric;
|
||||||
import org.xbib.elx.api.BulkProcessor;
|
import org.xbib.elx.api.BulkProcessor;
|
||||||
import org.xbib.elx.api.IndexDefinition;
|
import org.xbib.elx.api.IndexDefinition;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -45,72 +42,51 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
private final DefaultBulkListener bulkListener;
|
private final DefaultBulkListener bulkListener;
|
||||||
|
|
||||||
private final ScheduledThreadPoolExecutor scheduler;
|
|
||||||
|
|
||||||
private ScheduledFuture<?> scheduledFuture;
|
private ScheduledFuture<?> scheduledFuture;
|
||||||
|
|
||||||
private BulkRequest bulkRequest;
|
private BulkRequest bulkRequest;
|
||||||
|
|
||||||
private long maxBulkVolume;
|
private long bulkVolume;
|
||||||
|
|
||||||
private int maxBulkActions;
|
private int bulkActions;
|
||||||
|
|
||||||
private final AtomicBoolean closed;
|
private final AtomicBoolean closed;
|
||||||
|
|
||||||
private final AtomicLong executionIdGen;
|
private final AtomicLong executionIdGen;
|
||||||
|
|
||||||
private ResizeableSemaphore semaphore;
|
private final ResizeableSemaphore semaphore;
|
||||||
|
|
||||||
private int permits;
|
private final int permits;
|
||||||
|
|
||||||
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
|
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
|
||||||
this.bulkClient = bulkClient;
|
this.bulkClient = bulkClient;
|
||||||
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(),
|
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(),
|
||||||
Parameters.MAX_ACTIONS_PER_REQUEST.getInteger());
|
Parameters.MAX_ACTIONS_PER_REQUEST.getInteger());
|
||||||
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(),
|
String flushIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(),
|
||||||
Parameters.MAX_CONCURRENT_REQUESTS.getInteger());
|
|
||||||
String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(),
|
|
||||||
Parameters.FLUSH_INTERVAL.getString());
|
Parameters.FLUSH_INTERVAL.getString());
|
||||||
TimeValue flushInterval = TimeValue.parseTimeValue(flushIngestIntervalStr,
|
TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr,
|
||||||
TimeValue.timeValueSeconds(30), "");
|
TimeValue.timeValueSeconds(30), "");
|
||||||
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(),
|
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.MIN_VOLUME_PER_REQUEST.getName(),
|
||||||
ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
ByteSizeValue.parseBytesSizeValue(Parameters.MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(),
|
|
||||||
Parameters.ENABLE_BULK_LOGGING.getBoolean());
|
|
||||||
boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(),
|
|
||||||
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
|
|
||||||
int ringBufferSize = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(),
|
|
||||||
Parameters.RESPONSE_TIME_COUNT.getInteger());
|
|
||||||
this.client = bulkClient.getClient();
|
this.client = bulkClient.getClient();
|
||||||
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2,
|
|
||||||
EsExecutors.daemonThreadFactory("elx-bulk-processor"));
|
|
||||||
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
||||||
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
|
||||||
if (flushInterval.millis() > 0L) {
|
if (flushInterval.millis() > 0L) {
|
||||||
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
||||||
flushInterval.millis(), TimeUnit.MILLISECONDS);
|
flushInterval.millis(), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
this.bulkListener = new DefaultBulkListener(this, scheduler,
|
this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings);
|
||||||
enableBulkLogging, failOnBulkError, ringBufferSize);
|
this.bulkActions = maxActionsPerRequest;
|
||||||
this.permits = maxConcurrentRequests;
|
this.bulkVolume = minVolumePerRequest.getBytes();
|
||||||
this.maxBulkActions = maxActionsPerRequest;
|
|
||||||
this.maxBulkVolume = maxVolumePerRequest != null ? maxVolumePerRequest.getBytes() : -1;
|
|
||||||
this.bulkRequest = new BulkRequest();
|
this.bulkRequest = new BulkRequest();
|
||||||
this.closed = new AtomicBoolean(false);
|
this.closed = new AtomicBoolean(false);
|
||||||
this.enabled = new AtomicBoolean(false);
|
this.enabled = new AtomicBoolean(false);
|
||||||
this.executionIdGen = new AtomicLong();
|
this.executionIdGen = new AtomicLong();
|
||||||
if (permits > 0) {
|
this.permits = settings.getAsInt(Parameters.PERMITS.getName(), Parameters.PERMITS.getInteger());
|
||||||
this.semaphore = new ResizeableSemaphore(permits);
|
if (permits < 1) {
|
||||||
|
throw new IllegalArgumentException("must not be less 1 permits for bulk indexing");
|
||||||
}
|
}
|
||||||
|
this.semaphore = new ResizeableSemaphore(permits);
|
||||||
if (logger.isInfoEnabled()) {
|
if (logger.isInfoEnabled()) {
|
||||||
logger.info("bulk processor now active with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
|
logger.info("bulk processor now active");
|
||||||
"flushInterval = {} maxVolumePerRequest = {} " +
|
|
||||||
"bulk logging = {} fail on bulk error = {} " +
|
|
||||||
"logger debug = {} from settings = {}",
|
|
||||||
maxActionsPerRequest, maxConcurrentRequests,
|
|
||||||
flushInterval, maxVolumePerRequest,
|
|
||||||
enableBulkLogging, failOnBulkError,
|
|
||||||
logger.isDebugEnabled(), settings.toDelimitedString(','));
|
|
||||||
}
|
}
|
||||||
setEnabled(true);
|
setEnabled(true);
|
||||||
}
|
}
|
||||||
|
@ -149,22 +125,22 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setMaxBulkActions(int bulkActions) {
|
public void setMaxBulkActions(int bulkActions) {
|
||||||
this.maxBulkActions = bulkActions;
|
this.bulkActions = bulkActions;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMaxBulkActions() {
|
public int getMaxBulkActions() {
|
||||||
return maxBulkActions;
|
return bulkActions;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setMaxBulkVolume(long bulkSize) {
|
public void setMaxBulkVolume(long bulkSize) {
|
||||||
this.maxBulkVolume = bulkSize;
|
this.bulkVolume = bulkSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getMaxBulkVolume() {
|
public long getMaxBulkVolume() {
|
||||||
return maxBulkVolume;
|
return bulkVolume;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -181,8 +157,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
public synchronized void add(ActionRequest<?> request) {
|
public synchronized void add(ActionRequest<?> request) {
|
||||||
ensureOpenAndActive();
|
ensureOpenAndActive();
|
||||||
bulkRequest.add(request);
|
bulkRequest.add(request);
|
||||||
if ((maxBulkActions != -1 && bulkRequest.numberOfActions() >= maxBulkActions) ||
|
if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) ||
|
||||||
(maxBulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= maxBulkVolume)) {
|
(bulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= bulkVolume)) {
|
||||||
execute();
|
execute();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -221,9 +197,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
if (scheduledFuture != null) {
|
if (scheduledFuture != null) {
|
||||||
scheduledFuture.cancel(true);
|
scheduledFuture.cancel(true);
|
||||||
}
|
}
|
||||||
if (scheduler != null) {
|
|
||||||
scheduler.shutdown();
|
|
||||||
}
|
|
||||||
// like flush but without ensuring open
|
// like flush but without ensuring open
|
||||||
if (bulkRequest.numberOfActions() > 0) {
|
if (bulkRequest.numberOfActions() > 0) {
|
||||||
execute();
|
execute();
|
||||||
|
@ -294,9 +267,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
|
|
||||||
private boolean drainSemaphore(long timeValue, TimeUnit timeUnit) throws InterruptedException {
|
private boolean drainSemaphore(long timeValue, TimeUnit timeUnit) throws InterruptedException {
|
||||||
if (semaphore != null) {
|
if (semaphore != null) {
|
||||||
if (semaphore.tryAcquire(permits, timeValue, timeUnit)) {
|
if (permits <= 0) {
|
||||||
semaphore.release(permits);
|
|
||||||
return true;
|
return true;
|
||||||
|
} else {
|
||||||
|
if (semaphore.tryAcquire(permits, timeValue, timeUnit)) {
|
||||||
|
semaphore.release(permits);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -311,6 +288,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("serial")
|
||||||
private static class ResizeableSemaphore extends Semaphore {
|
private static class ResizeableSemaphore extends Semaphore {
|
||||||
|
|
||||||
ResizeableSemaphore(int permits) {
|
ResizeableSemaphore(int permits) {
|
||||||
|
|
|
@ -3,17 +3,22 @@ package org.xbib.elx.common;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.xbib.elx.api.SearchMetric;
|
import org.xbib.elx.api.SearchMetric;
|
||||||
import org.xbib.metrics.api.Count;
|
import org.xbib.metrics.api.Count;
|
||||||
import org.xbib.metrics.api.Metered;
|
import org.xbib.metrics.api.Metered;
|
||||||
import org.xbib.metrics.common.CountMetric;
|
import org.xbib.metrics.common.CountMetric;
|
||||||
import org.xbib.metrics.common.Meter;
|
import org.xbib.metrics.common.Meter;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class DefaultSearchMetric implements SearchMetric {
|
public class DefaultSearchMetric implements SearchMetric {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName());
|
private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName());
|
||||||
|
|
||||||
|
private final ScheduledFuture<?> future;
|
||||||
|
|
||||||
private final Meter totalQuery;
|
private final Meter totalQuery;
|
||||||
|
|
||||||
private final Count currentQuery;
|
private final Count currentQuery;
|
||||||
|
@ -32,19 +37,24 @@ public class DefaultSearchMetric implements SearchMetric {
|
||||||
|
|
||||||
private Long stopped;
|
private Long stopped;
|
||||||
|
|
||||||
public DefaultSearchMetric() {
|
public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
||||||
totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor());
|
Settings settings) {
|
||||||
|
totalQuery = new Meter(scheduledThreadPoolExecutor);
|
||||||
currentQuery = new CountMetric();
|
currentQuery = new CountMetric();
|
||||||
queries = new CountMetric();
|
queries = new CountMetric();
|
||||||
succeededQueries = new CountMetric();
|
succeededQueries = new CountMetric();
|
||||||
emptyQueries = new CountMetric();
|
emptyQueries = new CountMetric();
|
||||||
failedQueries = new CountMetric();
|
failedQueries = new CountMetric();
|
||||||
timeoutQueries = new CountMetric();
|
timeoutQueries = new CountMetric();
|
||||||
|
String metricLogIntervalStr = settings.get(Parameters.METRIC_LOG_INTERVAL.getName(),
|
||||||
|
Parameters.METRIC_LOG_INTERVAL.getString());
|
||||||
|
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||||
|
TimeValue.timeValueSeconds(10), "");
|
||||||
|
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Settings settings) {
|
public void init(Settings settings) {
|
||||||
logger.info("init");
|
|
||||||
start();
|
start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,6 +113,8 @@ public class DefaultSearchMetric implements SearchMetric {
|
||||||
public void stop() {
|
public void stop() {
|
||||||
this.stopped = System.nanoTime();
|
this.stopped = System.nanoTime();
|
||||||
totalQuery.stop();
|
totalQuery.stop();
|
||||||
|
log();
|
||||||
|
this.future.cancel(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -110,4 +122,10 @@ public class DefaultSearchMetric implements SearchMetric {
|
||||||
stop();
|
stop();
|
||||||
totalQuery.shutdown();
|
totalQuery.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void log() {
|
||||||
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("docs = " + getTotalQueries().getCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
436
elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java
Normal file
436
elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java
Normal file
|
@ -0,0 +1,436 @@
|
||||||
|
package org.xbib.elx.common;
|
||||||
|
|
||||||
|
import org.xbib.time.pretty.PrettyTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Taken from org,apache.commons.lang.DurationFormatUtils of Apache commons-lang.
|
||||||
|
*/
|
||||||
|
public class FormatUtil {
|
||||||
|
|
||||||
|
private static final PrettyTime pretty = new PrettyTime();
|
||||||
|
|
||||||
|
private static final String EMPTY = "";
|
||||||
|
private static final String YEAR = "y";
|
||||||
|
private static final String MONTH = "M";
|
||||||
|
private static final String DAY = "d";
|
||||||
|
private static final String HOUR = "H";
|
||||||
|
private static final String MINUTE = "m";
|
||||||
|
private static final String SECOND = "s";
|
||||||
|
private static final String MILLISECOND = "S";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of milliseconds in a standard second.
|
||||||
|
*/
|
||||||
|
private static final long MILLIS_PER_SECOND = 1000;
|
||||||
|
/**
|
||||||
|
* Number of milliseconds in a standard minute.
|
||||||
|
*/
|
||||||
|
private static final long MILLIS_PER_MINUTE = 60 * MILLIS_PER_SECOND;
|
||||||
|
/**
|
||||||
|
* Number of milliseconds in a standard hour.
|
||||||
|
*/
|
||||||
|
private static final long MILLIS_PER_HOUR = 60 * MILLIS_PER_MINUTE;
|
||||||
|
/**
|
||||||
|
* Number of milliseconds in a standard day.
|
||||||
|
*/
|
||||||
|
private static final long MILLIS_PER_DAY = 24 * MILLIS_PER_HOUR;
|
||||||
|
|
||||||
|
private static final String[] BYTES = {
|
||||||
|
" B", " kB", " MB", " GB", " TB", " PB", " EB", " ZB", " YB"
|
||||||
|
};
|
||||||
|
private static final String[] BYTES_PER_SECOND = {
|
||||||
|
" B/s", " kB/s", " MB/s", " GB/s", " TB/s", " PB/s", " EB/s", " ZB/s", " YB/s"
|
||||||
|
};
|
||||||
|
private static final String[] DOCS_PER_SECOND = {
|
||||||
|
" dps", " kdps", " Mdps", " Gdps", " Tdps", " Pdps", " Edps", " Zdps", " Ydps"
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Format byte size (file size as example) into a string,
|
||||||
|
* with two digits after dot and actual measure (MB, GB or other).
|
||||||
|
*
|
||||||
|
* @param size value to format
|
||||||
|
* @return formatted string in bytes, kB, MB or other.
|
||||||
|
*/
|
||||||
|
public static String formatSize(long size) {
|
||||||
|
return format(size, BYTES, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatSize(double size) {
|
||||||
|
return format(size, BYTES, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Format speed values (copy speed as example) into a string
|
||||||
|
* with two digits after dot and actual measure (MB/s, GB/s or other).
|
||||||
|
*
|
||||||
|
* @param speed value to format
|
||||||
|
* @return formatted string in bytes/s, kB/s, MB/s or other.
|
||||||
|
*/
|
||||||
|
public static String formatSpeed(long speed) {
|
||||||
|
return format(speed, BYTES_PER_SECOND, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatSpeed(double speed) {
|
||||||
|
return format(speed, BYTES_PER_SECOND, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatDocumentSpeed(long speed) {
|
||||||
|
return format(speed, DOCS_PER_SECOND, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatDocumentSpeed(double speed) {
|
||||||
|
return format(speed, DOCS_PER_SECOND, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Format any value without string appending.
|
||||||
|
*
|
||||||
|
* @param size value to format
|
||||||
|
* @param measureUnits array of strings to use as measurement units. Use BYTES_PER_SECOND as example.
|
||||||
|
* @param measureQuantity quantiry, required to step into next unit. Like 1024 for bytes,
|
||||||
|
* 1000 for meters or 100 for century.
|
||||||
|
* @return formatted size with measure unit
|
||||||
|
*/
|
||||||
|
private static String format(long size, String[] measureUnits, int measureQuantity) {
|
||||||
|
if (size <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (size < measureQuantity) {
|
||||||
|
return size + measureUnits[0];
|
||||||
|
}
|
||||||
|
int i = 1;
|
||||||
|
double d = size;
|
||||||
|
while ((d = d / measureQuantity) > (measureQuantity - 1)) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
long l = (long) (d * 100);
|
||||||
|
d = (double) l / 100;
|
||||||
|
if (i < measureUnits.length) {
|
||||||
|
return d + measureUnits[i];
|
||||||
|
}
|
||||||
|
return String.valueOf(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String format(double value, String[] measureUnits, int measureQuantity) {
|
||||||
|
double d = value;
|
||||||
|
if (d <= 0.0d) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (d < measureQuantity) {
|
||||||
|
return d + measureUnits[0];
|
||||||
|
}
|
||||||
|
int i = 1;
|
||||||
|
while ((d = d / measureQuantity) > (measureQuantity - 1)) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
long l = (long) (d * 100);
|
||||||
|
d = (double) l / 100;
|
||||||
|
if (i < measureUnits.length) {
|
||||||
|
return d + measureUnits[i];
|
||||||
|
}
|
||||||
|
return String.valueOf(d);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatMillis(long millis) {
|
||||||
|
return pretty.format(pretty.calculateDuration(millis));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatDurationWords(long value, boolean suppressLeadingZeroElements,
|
||||||
|
boolean suppressTrailingZeroElements) {
|
||||||
|
// This method is generally replacable by the format method, but
|
||||||
|
// there are a series of tweaks and special cases that require
|
||||||
|
// trickery to replicate.
|
||||||
|
String duration = formatDuration(value, "d' days 'H' hours 'm' minutes 's' seconds'");
|
||||||
|
if (suppressLeadingZeroElements) {
|
||||||
|
// this is a temporary marker on the front. Like ^ in regexp.
|
||||||
|
duration = " " + duration;
|
||||||
|
String tmp = replaceOnce(duration, " 0 days", "");
|
||||||
|
if (tmp.length() != duration.length()) {
|
||||||
|
duration = tmp;
|
||||||
|
tmp = replaceOnce(duration, " 0 hours", "");
|
||||||
|
if (tmp.length() != duration.length()) {
|
||||||
|
duration = tmp;
|
||||||
|
tmp = replaceOnce(duration, " 0 minutes", "");
|
||||||
|
if (tmp.length() != duration.length()) {
|
||||||
|
duration = replaceOnce(tmp, " 0 seconds", "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (duration.length() != 0) {
|
||||||
|
// strip the space off again
|
||||||
|
duration = duration.substring(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (suppressTrailingZeroElements) {
|
||||||
|
String tmp = replaceOnce(duration, " 0 seconds", "");
|
||||||
|
if (tmp != null && tmp.length() != duration.length()) {
|
||||||
|
duration = tmp;
|
||||||
|
tmp = replaceOnce(duration, " 0 minutes", "");
|
||||||
|
if (tmp.length() != duration.length()) {
|
||||||
|
duration = tmp;
|
||||||
|
tmp = replaceOnce(duration, " 0 hours", "");
|
||||||
|
if (tmp.length() != duration.length()) {
|
||||||
|
duration = replaceOnce(tmp, " 0 days", "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
duration = " " + duration;
|
||||||
|
duration = replaceOnce(duration, " 1 seconds", " 1 second");
|
||||||
|
duration = replaceOnce(duration, " 1 minutes", " 1 minute");
|
||||||
|
duration = replaceOnce(duration, " 1 hours", " 1 hour");
|
||||||
|
duration = replaceOnce(duration, " 1 days", " 1 day");
|
||||||
|
return duration != null ? duration.trim() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatDuration(long millis, String format) {
|
||||||
|
long durationMillis = millis;
|
||||||
|
List<Token> tokens = lexx(format);
|
||||||
|
int days = 0;
|
||||||
|
int hours = 0;
|
||||||
|
int minutes = 0;
|
||||||
|
int seconds = 0;
|
||||||
|
int milliseconds = 0;
|
||||||
|
|
||||||
|
if (Token.containsTokenWithValue(tokens, DAY)) {
|
||||||
|
days = (int) (durationMillis / MILLIS_PER_DAY);
|
||||||
|
durationMillis -= days * MILLIS_PER_DAY;
|
||||||
|
}
|
||||||
|
if (Token.containsTokenWithValue(tokens, HOUR)) {
|
||||||
|
hours = (int) (durationMillis / MILLIS_PER_HOUR);
|
||||||
|
durationMillis -= hours * MILLIS_PER_HOUR;
|
||||||
|
}
|
||||||
|
if (Token.containsTokenWithValue(tokens, MINUTE)) {
|
||||||
|
minutes = (int) (durationMillis / MILLIS_PER_MINUTE);
|
||||||
|
durationMillis -= minutes * MILLIS_PER_MINUTE;
|
||||||
|
}
|
||||||
|
if (Token.containsTokenWithValue(tokens, SECOND)) {
|
||||||
|
seconds = (int) (durationMillis / MILLIS_PER_SECOND);
|
||||||
|
durationMillis -= seconds * MILLIS_PER_SECOND;
|
||||||
|
}
|
||||||
|
if (Token.containsTokenWithValue(tokens, MILLISECOND)) {
|
||||||
|
milliseconds = (int) durationMillis;
|
||||||
|
}
|
||||||
|
return format(tokens, days, hours, minutes, seconds, milliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>The internal method to do the formatting.</p>
|
||||||
|
*
|
||||||
|
* @param tokens the tokens
|
||||||
|
* @param days the number of days
|
||||||
|
* @param hours the number of hours
|
||||||
|
* @param minutes the number of minutes
|
||||||
|
* @param seconds the number of seconds
|
||||||
|
* @param millis the number of millis
|
||||||
|
* @return the formatted string
|
||||||
|
*/
|
||||||
|
private static String format(List<Token> tokens,
|
||||||
|
int days, int hours, int minutes, int seconds, int millis) {
|
||||||
|
int milliseconds = millis;
|
||||||
|
StringBuilder buffer = new StringBuilder();
|
||||||
|
boolean lastOutputSeconds = false;
|
||||||
|
for (Token token : tokens) {
|
||||||
|
Object value = token.getValue();
|
||||||
|
if (value instanceof StringBuilder) {
|
||||||
|
buffer.append(value);
|
||||||
|
} else {
|
||||||
|
if (DAY.equals(value)) {
|
||||||
|
buffer.append(days);
|
||||||
|
lastOutputSeconds = false;
|
||||||
|
} else if (HOUR.equals(value)) {
|
||||||
|
buffer.append(hours);
|
||||||
|
lastOutputSeconds = false;
|
||||||
|
} else if (MINUTE.equals(value)) {
|
||||||
|
buffer.append(minutes);
|
||||||
|
lastOutputSeconds = false;
|
||||||
|
} else if (SECOND.equals(value)) {
|
||||||
|
buffer.append(seconds);
|
||||||
|
lastOutputSeconds = true;
|
||||||
|
} else if (MILLISECOND.equals(value)) {
|
||||||
|
if (lastOutputSeconds) {
|
||||||
|
milliseconds += 1000;
|
||||||
|
String str = Integer.toString(milliseconds);
|
||||||
|
buffer.append(str.substring(1));
|
||||||
|
} else {
|
||||||
|
buffer.append(milliseconds);
|
||||||
|
}
|
||||||
|
lastOutputSeconds = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return buffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parses a classic date format string into Tokens.
|
||||||
|
*
|
||||||
|
* @param format to parse
|
||||||
|
* @return array of Token
|
||||||
|
*/
|
||||||
|
private static List<Token> lexx(String format) {
|
||||||
|
char[] array = format.toCharArray();
|
||||||
|
List<Token> list = new ArrayList<>(array.length);
|
||||||
|
boolean inLiteral = false;
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
Token previous = null;
|
||||||
|
for (char ch : array) {
|
||||||
|
if (inLiteral && ch != '\'') {
|
||||||
|
sb.append(ch);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Object value = null;
|
||||||
|
switch (ch) {
|
||||||
|
case '\'':
|
||||||
|
if (inLiteral) {
|
||||||
|
sb = new StringBuilder();
|
||||||
|
inLiteral = false;
|
||||||
|
} else {
|
||||||
|
sb = new StringBuilder();
|
||||||
|
list.add(new Token(sb));
|
||||||
|
inLiteral = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 'y':
|
||||||
|
value = YEAR;
|
||||||
|
break;
|
||||||
|
case 'M':
|
||||||
|
value = MONTH;
|
||||||
|
break;
|
||||||
|
case 'd':
|
||||||
|
value = DAY;
|
||||||
|
break;
|
||||||
|
case 'H':
|
||||||
|
value = HOUR;
|
||||||
|
break;
|
||||||
|
case 'm':
|
||||||
|
value = MINUTE;
|
||||||
|
break;
|
||||||
|
case 's':
|
||||||
|
value = SECOND;
|
||||||
|
break;
|
||||||
|
case 'S':
|
||||||
|
value = MILLISECOND;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
if (sb.length() == 0) {
|
||||||
|
sb = new StringBuilder();
|
||||||
|
list.add(new Token(sb));
|
||||||
|
}
|
||||||
|
sb.append(ch);
|
||||||
|
}
|
||||||
|
if (value != null) {
|
||||||
|
if (previous != null && value.equals(previous.getValue())) {
|
||||||
|
previous.increment();
|
||||||
|
} else {
|
||||||
|
Token token = new Token(value);
|
||||||
|
list.add(token);
|
||||||
|
previous = token;
|
||||||
|
}
|
||||||
|
sb.setLength(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String replaceOnce(String text, String searchString, String replacement) {
|
||||||
|
return replace(text, searchString, replacement, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String replace(String text, String searchString, String replacement, int maxvalue) {
|
||||||
|
int max = maxvalue;
|
||||||
|
if (isNullOrEmpty(text) || isNullOrEmpty(searchString) || replacement == null || max == 0) {
|
||||||
|
return text;
|
||||||
|
}
|
||||||
|
int start = 0;
|
||||||
|
int end = text.indexOf(searchString, start);
|
||||||
|
if (end == -1) {
|
||||||
|
return text;
|
||||||
|
}
|
||||||
|
int replLength = searchString.length();
|
||||||
|
int increase = replacement.length() - replLength;
|
||||||
|
increase = Math.max(increase, 0);
|
||||||
|
increase *= (max < 0 ? 16 : (Math.min(max, 64)));
|
||||||
|
StringBuilder buf = new StringBuilder(text.length() + increase);
|
||||||
|
while (end != -1) {
|
||||||
|
buf.append(text, start, end).append(replacement);
|
||||||
|
start = end + replLength;
|
||||||
|
if (--max == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
end = text.indexOf(searchString, start);
|
||||||
|
}
|
||||||
|
buf.append(text.substring(start));
|
||||||
|
return buf.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isNullOrEmpty(String target) {
|
||||||
|
return target == null || EMPTY.equals(target);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Element that is parsed from the format pattern.
|
||||||
|
*/
|
||||||
|
private static class Token {
|
||||||
|
|
||||||
|
private final Object value;
|
||||||
|
|
||||||
|
private int count;
|
||||||
|
|
||||||
|
Token(Object value) {
|
||||||
|
this.value = value;
|
||||||
|
this.count = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean containsTokenWithValue(List<Token> tokens, Object value) {
|
||||||
|
for (Token token : tokens) {
|
||||||
|
if (token.getValue().equals(value)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
void increment() {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Object getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj instanceof Token) {
|
||||||
|
Token tok = (Token) obj;
|
||||||
|
if (this.value.getClass() != tok.value.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (this.count != tok.count) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (this.value instanceof StringBuilder) {
|
||||||
|
return this.value.toString().equals(tok.value.toString());
|
||||||
|
} else if (this.value instanceof Number) {
|
||||||
|
return this.value.equals(tok.value);
|
||||||
|
} else {
|
||||||
|
return this.value == tok.value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return this.value.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return value + " (" + count + ")";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -12,17 +12,23 @@ public enum Parameters {
|
||||||
|
|
||||||
ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true),
|
ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true),
|
||||||
|
|
||||||
FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true),
|
FAIL_ON_BULK_ERROR("bulk.fail_on_error", Boolean.class, true),
|
||||||
|
|
||||||
MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1),
|
MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1),
|
||||||
|
|
||||||
RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 16),
|
MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"),
|
||||||
|
|
||||||
MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, 1 /*Runtime.getRuntime().availableProcessors() - 1*/),
|
MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "5m"),
|
||||||
|
|
||||||
MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1kb"),
|
FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"),
|
||||||
|
|
||||||
FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s");
|
MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
|
||||||
|
|
||||||
|
METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"),
|
||||||
|
|
||||||
|
RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()),
|
||||||
|
|
||||||
|
PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1);
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ public class NodeAdminClient extends AbstractAdminClient {
|
||||||
private final NodeClientHelper helper;
|
private final NodeClientHelper helper;
|
||||||
|
|
||||||
public NodeAdminClient() {
|
public NodeAdminClient() {
|
||||||
|
super();
|
||||||
this.helper = new NodeClientHelper();
|
this.helper = new NodeClientHelper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ public class NodeBulkClient extends AbstractBulkClient {
|
||||||
private final NodeClientHelper helper;
|
private final NodeClientHelper helper;
|
||||||
|
|
||||||
public NodeBulkClient() {
|
public NodeBulkClient() {
|
||||||
|
super();
|
||||||
this.helper = new NodeClientHelper();
|
this.helper = new NodeClientHelper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ public class NodeSearchClient extends AbstractSearchClient {
|
||||||
private final NodeClientHelper helper;
|
private final NodeClientHelper helper;
|
||||||
|
|
||||||
public NodeSearchClient() {
|
public NodeSearchClient() {
|
||||||
|
super();
|
||||||
this.helper = new NodeClientHelper();
|
this.helper = new NodeClientHelper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -129,7 +129,6 @@ class BulkClientTest {
|
||||||
.put(helper.getNodeSettings())
|
.put(helper.getNodeSettings())
|
||||||
.build()) {
|
.build()) {
|
||||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||||
indexDefinition.setStartBulkRefreshSeconds(0);
|
|
||||||
bulkClient.newIndex(indexDefinition);
|
bulkClient.newIndex(indexDefinition);
|
||||||
bulkClient.startBulk(indexDefinition);
|
bulkClient.startBulk(indexDefinition);
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
|
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
|
||||||
|
|
|
@ -15,6 +15,7 @@ public class TransportAdminClient extends AbstractAdminClient {
|
||||||
private final TransportClientHelper helper;
|
private final TransportClientHelper helper;
|
||||||
|
|
||||||
public TransportAdminClient() {
|
public TransportAdminClient() {
|
||||||
|
super();
|
||||||
this.helper = new TransportClientHelper();
|
this.helper = new TransportClientHelper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ public class TransportBulkClient extends AbstractBulkClient {
|
||||||
private final TransportClientHelper helper;
|
private final TransportClientHelper helper;
|
||||||
|
|
||||||
public TransportBulkClient() {
|
public TransportBulkClient() {
|
||||||
|
super();
|
||||||
this.helper = new TransportClientHelper();
|
this.helper = new TransportClientHelper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ public class TransportSearchClient extends AbstractSearchClient {
|
||||||
private final TransportClientHelper helper;
|
private final TransportClientHelper helper;
|
||||||
|
|
||||||
public TransportSearchClient() {
|
public TransportSearchClient() {
|
||||||
|
super();
|
||||||
this.helper = new TransportClientHelper();
|
this.helper = new TransportClientHelper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
group = org.xbib
|
group = org.xbib
|
||||||
name = elx
|
name = elx
|
||||||
version = 2.2.1.37
|
version = 2.2.1.39
|
||||||
|
|
||||||
gradle.wrapper.version = 6.6.1
|
gradle.wrapper.version = 6.6.1
|
||||||
xbib-metrics.version = 2.1.0
|
xbib-metrics.version = 2.1.0
|
||||||
|
xbib-time.version = 2.1.0
|
||||||
xbib-guice.version = 4.4.2
|
xbib-guice.version = 4.4.2
|
||||||
xbib-guava.version = 28.1
|
xbib-guava.version = 28.1
|
||||||
xbib-netty-http.version = 4.1.63.0
|
xbib-netty-http.version = 4.1.63.0
|
||||||
|
|
Loading…
Reference in a new issue