align with es221
This commit is contained in:
parent
6af56268aa
commit
870cc09767
47 changed files with 1148 additions and 861 deletions
|
@ -149,5 +149,5 @@ public interface BulkClient extends BasicClient, Flushable {
|
|||
*/
|
||||
void flushIndex(IndexDefinition indexDefinition);
|
||||
|
||||
BulkProcessor getBulkController();
|
||||
BulkProcessor getBulkProcessor();
|
||||
}
|
||||
|
|
|
@ -15,6 +15,14 @@ public interface BulkProcessor extends Closeable, Flushable {
|
|||
|
||||
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
|
||||
|
||||
void add(DocWriteRequest<?> request);
|
||||
|
||||
boolean waitForBulkResponses(long timeout, TimeUnit unit);
|
||||
|
||||
BulkMetric getBulkMetric();
|
||||
|
||||
Throwable getLastBulkError();
|
||||
|
||||
void setMaxBulkActions(int bulkActions);
|
||||
|
||||
int getMaxBulkActions();
|
||||
|
@ -23,11 +31,4 @@ public interface BulkProcessor extends Closeable, Flushable {
|
|||
|
||||
long getMaxBulkVolume();
|
||||
|
||||
void add(DocWriteRequest<?> request);
|
||||
|
||||
boolean waitForBulkResponses(long timeout, TimeUnit unit);
|
||||
|
||||
BulkMetric getBulkMetric();
|
||||
|
||||
Throwable getLastBulkError();
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
dependencies {
|
||||
api project(':elx-api')
|
||||
implementation "org.xbib:time:${rootProject.property('xbib-time.version')}"
|
||||
testImplementation "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
|
||||
testImplementation "io.netty:netty-codec-http:${project.property('netty.version')}"
|
||||
testImplementation "io.netty:netty-transport:${project.property('netty.version')}"
|
||||
|
|
|
@ -81,7 +81,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public Map<String, ?> getMapping(IndexDefinition indexDefinition) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return null;
|
||||
}
|
||||
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
||||
|
@ -96,7 +96,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public AdminClient deleteIndex(IndexDefinition indexDefinition) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return this;
|
||||
}
|
||||
String index = indexDefinition.getFullIndexName();
|
||||
|
@ -113,7 +113,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return this;
|
||||
}
|
||||
if (level < 1) {
|
||||
|
@ -129,7 +129,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public int getReplicaLevel(IndexDefinition indexDefinition) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return -1;
|
||||
}
|
||||
String index = indexDefinition.getFullIndexName();
|
||||
|
@ -207,7 +207,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
if (additionalAliases == null) {
|
||||
return new EmptyIndexShiftResult();
|
||||
}
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return new EmptyIndexShiftResult();
|
||||
}
|
||||
if (indexDefinition.isShiftEnabled()) {
|
||||
|
@ -295,7 +295,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) {
|
||||
return indexDefinition != null && indexDefinition.isPruneEnabled() &&
|
||||
return indexDefinition != null&& indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() &&
|
||||
indexDefinition.getRetention() != null &&
|
||||
indexDefinition.getDateTimePattern() != null ?
|
||||
pruneIndex(indexDefinition.getIndex(),
|
||||
|
@ -313,9 +313,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
logger.info("before pruning: index = {} full index = {} delta = {} mintokeep = {} pattern = {}",
|
||||
index, protectedIndexName, delta, mintokeep, pattern);
|
||||
if (delta == 0 && mintokeep == 0) {
|
||||
logger.info("no candidates found, delta is 0 and mintokeep is 0");
|
||||
return new EmptyPruneResult();
|
||||
}
|
||||
if (index.equals(protectedIndexName)) {
|
||||
logger.info("no candidates found, only protected index name is given");
|
||||
return new EmptyPruneResult();
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
|
@ -330,6 +332,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
}
|
||||
}
|
||||
if (candidateIndices.isEmpty()) {
|
||||
logger.info("no candidates found");
|
||||
return new EmptyPruneResult();
|
||||
}
|
||||
if (mintokeep > 0 && candidateIndices.size() <= mintokeep) {
|
||||
|
@ -365,7 +368,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return -1L;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
|
@ -393,7 +396,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public boolean forceMerge(IndexDefinition indexDefinition) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return false;
|
||||
}
|
||||
if (!indexDefinition.isForceMergeEnabled()) {
|
||||
|
@ -433,6 +436,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public void checkMapping(IndexDefinition indexDefinition) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(indexDefinition.getFullIndexName());
|
||||
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
|
||||
|
|
|
@ -7,6 +7,12 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
|
@ -22,10 +28,15 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
|
|||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPoolInfo;
|
||||
import org.xbib.elx.api.BasicClient;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -37,12 +48,23 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
|
||||
protected Settings settings;
|
||||
|
||||
private final ScheduledThreadPoolExecutor scheduler;
|
||||
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
public AbstractBasicClient() {
|
||||
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2,
|
||||
EsExecutors.daemonThreadFactory("elx-bulk-processor"));
|
||||
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||
closed = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledThreadPoolExecutor getScheduler() {
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClient(ElasticsearchClient client) {
|
||||
this.client = client;
|
||||
|
@ -56,7 +78,7 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
@Override
|
||||
public void init(Settings settings) throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
logger.log(Level.INFO, "initializing client with settings = " + settings.toDelimitedString(','));
|
||||
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
|
||||
this.settings = settings;
|
||||
setClient(createClient(settings));
|
||||
}
|
||||
|
@ -82,6 +104,22 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
|
||||
ensureClientIsPresent();
|
||||
if (key == null) {
|
||||
throw new IOException("no key given");
|
||||
}
|
||||
if (value == null) {
|
||||
throw new IOException("no value given");
|
||||
}
|
||||
Settings.Builder updateSettingsBuilder = Settings.builder();
|
||||
updateSettingsBuilder.put(key, value.toString());
|
||||
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
|
||||
updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit));
|
||||
client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) {
|
||||
ensureClientIsPresent();
|
||||
|
@ -140,6 +178,10 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
|
||||
@Override
|
||||
public long getSearchableDocs(IndexDefinition indexDefinition) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return -1L;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
||||
.setIndices(indexDefinition.getFullIndexName())
|
||||
.setQuery(QueryBuilders.matchAllQuery())
|
||||
|
@ -162,6 +204,9 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
ensureClientIsPresent();
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
closeClient(settings);
|
||||
if (scheduler != null) {
|
||||
scheduler.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,12 +238,12 @@ public abstract class AbstractBasicClient implements BasicClient {
|
|||
}
|
||||
}
|
||||
|
||||
protected boolean ensureIndexDefinition(IndexDefinition indexDefinition) {
|
||||
protected boolean isIndexDefinitionDisabled(IndexDefinition indexDefinition) {
|
||||
if (!indexDefinition.isEnabled()) {
|
||||
logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled");
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) {
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
||||
|
@ -23,7 +22,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.xbib.elx.api.BulkClient;
|
||||
import org.xbib.elx.api.BulkController;
|
||||
import org.xbib.elx.api.BulkProcessor;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -37,29 +36,32 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName());
|
||||
|
||||
private BulkController bulkController;
|
||||
private BulkProcessor bulkProcessor;
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(true);
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
public AbstractBulkClient() {
|
||||
super();
|
||||
closed = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) throws IOException {
|
||||
if (closed.compareAndSet(true, false)) {
|
||||
super.init(settings);
|
||||
logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(','));
|
||||
bulkController = new DefaultBulkController(this);
|
||||
bulkController.init(settings);
|
||||
bulkProcessor = new DefaultBulkProcessor(this, settings);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkController getBulkController() {
|
||||
return bulkController;
|
||||
public BulkProcessor getBulkProcessor() {
|
||||
return bulkProcessor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
if (bulkController != null) {
|
||||
bulkController.flush();
|
||||
if (bulkProcessor != null) {
|
||||
bulkProcessor.flush();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -67,17 +69,18 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
public void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
ensureClientIsPresent();
|
||||
if (bulkController != null) {
|
||||
logger.info("closing bulk controller");
|
||||
bulkController.close();
|
||||
if (bulkProcessor != null) {
|
||||
logger.info("closing bulk processor");
|
||||
bulkProcessor.close();
|
||||
}
|
||||
closeClient(settings);
|
||||
super.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newIndex(IndexDefinition indexDefinition) throws IOException {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return;
|
||||
}
|
||||
String index = indexDefinition.getFullIndexName();
|
||||
|
@ -121,23 +124,23 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public void startBulk(IndexDefinition indexDefinition) throws IOException {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return;
|
||||
}
|
||||
if (bulkController != null) {
|
||||
if (bulkProcessor != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.startBulkMode(indexDefinition);
|
||||
bulkProcessor.startBulkMode(indexDefinition);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopBulk(IndexDefinition indexDefinition) throws IOException {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return;
|
||||
}
|
||||
if (bulkController != null) {
|
||||
if (bulkProcessor != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.stopBulkMode(indexDefinition);
|
||||
bulkProcessor.stopBulkMode(indexDefinition);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,7 +151,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return this;
|
||||
}
|
||||
return index(new IndexRequest()
|
||||
|
@ -158,16 +161,14 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public BulkClient index(IndexRequest indexRequest) {
|
||||
if (bulkController != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.bulkIndex(indexRequest);
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
bulkProcessor.add(indexRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkClient delete(IndexDefinition indexDefinition, String id) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return this;
|
||||
}
|
||||
return delete(new DeleteRequest().index(indexDefinition.getFullIndexName()).id(id));
|
||||
|
@ -175,9 +176,9 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public BulkClient delete(DeleteRequest deleteRequest) {
|
||||
if (bulkController != null) {
|
||||
if (bulkProcessor != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.bulkDelete(deleteRequest);
|
||||
bulkProcessor.add(deleteRequest);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -189,7 +190,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return this;
|
||||
}
|
||||
return update(new UpdateRequest()
|
||||
|
@ -199,17 +200,20 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public BulkClient update(UpdateRequest updateRequest) {
|
||||
if (bulkController != null) {
|
||||
if (bulkProcessor != null) {
|
||||
ensureClientIsPresent();
|
||||
bulkController.bulkUpdate(updateRequest);
|
||||
bulkProcessor.add(updateRequest);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
|
||||
ensureClientIsPresent();
|
||||
return bulkController.waitForBulkResponses(timeout, timeUnit);
|
||||
if (bulkProcessor != null) {
|
||||
ensureClientIsPresent();
|
||||
return bulkProcessor.waitForBulkResponses(timeout, timeUnit);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -219,7 +223,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public void flushIndex(IndexDefinition indexDefinition) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
|
@ -228,7 +232,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
|||
|
||||
@Override
|
||||
public void refreshIndex(IndexDefinition indexDefinition) {
|
||||
if (!ensureIndexDefinition(indexDefinition)) {
|
||||
if (isIndexDefinitionDisabled(indexDefinition)) {
|
||||
return;
|
||||
}
|
||||
ensureClientIsPresent();
|
||||
|
|
|
@ -36,6 +36,13 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
|
||||
private SearchMetric searchMetric;
|
||||
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
public AbstractSearchClient() {
|
||||
super();
|
||||
this.closed = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchMetric getSearchMetric() {
|
||||
return searchMetric;
|
||||
|
@ -43,16 +50,20 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
|||
|
||||
@Override
|
||||
public void init(Settings settings) throws IOException {
|
||||
super.init(settings);
|
||||
this.searchMetric = new DefaultSearchMetric();
|
||||
searchMetric.init(settings);
|
||||
if (closed.compareAndSet(true, false)) {
|
||||
super.init(settings);
|
||||
this.searchMetric = new DefaultSearchMetric(getScheduler(), settings);
|
||||
searchMetric.init(settings);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.close();
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
super.close();
|
||||
if (searchMetric != null) {
|
||||
searchMetric.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,222 +0,0 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.xbib.elx.api.BulkClient;
|
||||
import org.xbib.elx.api.BulkController;
|
||||
import org.xbib.elx.api.BulkListener;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.elx.api.BulkProcessor;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.xbib.elx.api.IndexDefinition.TYPE_NAME;
|
||||
|
||||
public class DefaultBulkController implements BulkController {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(DefaultBulkController.class);
|
||||
|
||||
private final BulkClient bulkClient;
|
||||
|
||||
private final BulkMetric bulkMetric;
|
||||
|
||||
private BulkProcessor bulkProcessor;
|
||||
|
||||
private BulkListener bulkListener;
|
||||
|
||||
private long maxWaitTime;
|
||||
|
||||
private TimeUnit maxWaitTimeUnit;
|
||||
|
||||
private final AtomicBoolean active;
|
||||
|
||||
public DefaultBulkController(BulkClient bulkClient) {
|
||||
this.bulkClient = bulkClient;
|
||||
this.bulkMetric = new DefaultBulkMetric();
|
||||
this.active = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) {
|
||||
bulkMetric.init(settings);
|
||||
String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(),
|
||||
Parameters.MAX_WAIT_BULK_RESPONSE.getString());
|
||||
TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr,
|
||||
TimeValue.timeValueSeconds(30), "");
|
||||
this.maxWaitTime = maxWaitTimeValue.seconds();
|
||||
this.maxWaitTimeUnit = TimeUnit.SECONDS;
|
||||
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(),
|
||||
Parameters.MAX_ACTIONS_PER_REQUEST.getInteger());
|
||||
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(),
|
||||
Parameters.MAX_CONCURRENT_REQUESTS.getInteger());
|
||||
String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(),
|
||||
Parameters.FLUSH_INTERVAL.getString());
|
||||
TimeValue flushIngestInterval = TimeValue.parseTimeValue(flushIngestIntervalStr,
|
||||
TimeValue.timeValueSeconds(30), "");
|
||||
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(),
|
||||
Parameters.ENABLE_BULK_LOGGING.getBoolean());
|
||||
boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(),
|
||||
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
|
||||
int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(),
|
||||
Parameters.RESPONSE_TIME_COUNT.getInteger());
|
||||
this.bulkListener = new DefaultBulkListener(this, bulkMetric,
|
||||
enableBulkLogging, failOnBulkError, responseTimeCount);
|
||||
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
|
||||
.setBulkActions(maxActionsPerRequest)
|
||||
.setConcurrentRequests(maxConcurrentRequests)
|
||||
.setFlushInterval(flushIngestInterval)
|
||||
.setBulkSize(maxVolumePerRequest)
|
||||
.build();
|
||||
this.active.set(true);
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("bulk processor now active with maxWaitTime = {} maxActionsPerRequest = {} maxConcurrentRequests = {} " +
|
||||
"flushIngestInterval = {} maxVolumePerRequest = {} " +
|
||||
"bulk logging = {} fail on bulk error = {} " +
|
||||
"logger debug = {} from settings = {}",
|
||||
maxWaitTimeStr, maxActionsPerRequest, maxConcurrentRequests,
|
||||
flushIngestInterval, maxVolumePerRequest,
|
||||
enableBulkLogging, failOnBulkError,
|
||||
logger.isDebugEnabled(), settings.toDelimitedString(','));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkProcessor getBulkProcessor() {
|
||||
return bulkProcessor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkMetric getBulkMetric() {
|
||||
return bulkMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable getLastBulkError() {
|
||||
return bulkListener != null ? bulkListener.getLastBulkError() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inactivate() {
|
||||
this.active.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
|
||||
String indexName = indexDefinition.getFullIndexName();
|
||||
if (indexDefinition.getStartBulkRefreshSeconds() != 0) {
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval",
|
||||
indexDefinition.getStartBulkRefreshSeconds() + "s",
|
||||
30L, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkIndex(IndexRequest indexRequest) {
|
||||
ensureActiveAndBulk();
|
||||
try {
|
||||
bulkMetric.getCurrentIngest().inc(indexRequest.index(), TYPE_NAME, indexRequest.id());
|
||||
bulkProcessor.add(indexRequest);
|
||||
} catch (Exception e) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("bulk add of index failed: " + e.getMessage(), e);
|
||||
}
|
||||
inactivate();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkDelete(DeleteRequest deleteRequest) {
|
||||
ensureActiveAndBulk();
|
||||
try {
|
||||
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), TYPE_NAME, deleteRequest.id());
|
||||
bulkProcessor.add(deleteRequest);
|
||||
} catch (Exception e) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("bulk add of delete failed: " + e.getMessage(), e);
|
||||
}
|
||||
inactivate();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bulkUpdate(UpdateRequest updateRequest) {
|
||||
ensureActiveAndBulk();
|
||||
try {
|
||||
bulkMetric.getCurrentIngest().inc(updateRequest.index(), TYPE_NAME, updateRequest.id());
|
||||
bulkProcessor.add(updateRequest);
|
||||
} catch (Exception e) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("bulk add of update failed: " + e.getMessage(), e);
|
||||
}
|
||||
inactivate();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) {
|
||||
try {
|
||||
if (bulkProcessor != null) {
|
||||
bulkProcessor.flush();
|
||||
return bulkProcessor.awaitFlush(timeout, timeUnit);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("interrupted");
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopBulkMode(IndexDefinition indexDefinition) throws IOException {
|
||||
flush();
|
||||
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
|
||||
if (indexDefinition.getStopBulkRefreshSeconds() != 0) {
|
||||
bulkClient.updateIndexSetting(indexDefinition.getFullIndexName(),
|
||||
"refresh_interval",
|
||||
indexDefinition.getStopBulkRefreshSeconds() + "s",
|
||||
30L, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
if (bulkProcessor != null) {
|
||||
bulkProcessor.flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
bulkMetric.close();
|
||||
flush();
|
||||
bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit);
|
||||
if (bulkProcessor != null) {
|
||||
bulkProcessor.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureActiveAndBulk() {
|
||||
if (!active.get()) {
|
||||
throw new IllegalStateException("inactive");
|
||||
}
|
||||
if (bulkProcessor == null) {
|
||||
throw new UnsupportedOperationException("bulk processor not present");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,18 +5,19 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.xbib.elx.api.BulkController;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.elx.api.BulkListener;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import java.util.Arrays;
|
||||
import java.util.LongSummaryStatistics;
|
||||
import java.util.stream.LongStream;
|
||||
import org.xbib.elx.api.BulkProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
public class DefaultBulkListener implements BulkListener {
|
||||
|
||||
private final Logger logger = LogManager.getLogger(DefaultBulkListener.class.getName());
|
||||
|
||||
private final BulkController bulkController;
|
||||
private final BulkProcessor bulkProcessor;
|
||||
|
||||
private final BulkMetric bulkMetric;
|
||||
|
||||
|
@ -26,21 +27,22 @@ public class DefaultBulkListener implements BulkListener {
|
|||
|
||||
private Throwable lastBulkError;
|
||||
|
||||
private final int responseTimeCount;
|
||||
public DefaultBulkListener(DefaultBulkProcessor bulkProcessor,
|
||||
ScheduledThreadPoolExecutor scheduler,
|
||||
Settings settings) {
|
||||
this.bulkProcessor = bulkProcessor;
|
||||
boolean enableBulkLogging = settings.getAsBoolean(Parameters.BULK_LOGGING_ENABLED.getName(),
|
||||
Parameters.BULK_LOGGING_ENABLED.getBoolean());
|
||||
boolean failOnBulkError = settings.getAsBoolean(Parameters.BULK_FAIL_ON_ERROR.getName(),
|
||||
Parameters.BULK_FAIL_ON_ERROR.getBoolean());
|
||||
this.isBulkLoggingEnabled = enableBulkLogging;
|
||||
this.failOnError = failOnBulkError;
|
||||
this.bulkMetric = new DefaultBulkMetric(bulkProcessor, scheduler, settings);
|
||||
bulkMetric.start();
|
||||
}
|
||||
|
||||
private final LastResponseTimes responseTimes;
|
||||
|
||||
public DefaultBulkListener(BulkController bulkController,
|
||||
BulkMetric bulkMetric,
|
||||
boolean isBulkLoggingEnabled,
|
||||
boolean failOnError,
|
||||
int responseTimeCount) {
|
||||
this.bulkController = bulkController;
|
||||
this.bulkMetric = bulkMetric;
|
||||
this.isBulkLoggingEnabled = isBulkLoggingEnabled;
|
||||
this.failOnError = failOnError;
|
||||
this.responseTimeCount = responseTimeCount;
|
||||
this.responseTimes = new LastResponseTimes(responseTimeCount);
|
||||
public BulkMetric getBulkMetric() {
|
||||
return bulkMetric;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,7 +54,7 @@ public class DefaultBulkListener implements BulkListener {
|
|||
bulkMetric.getCurrentIngestNumDocs().inc(n);
|
||||
bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes());
|
||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
|
||||
logger.debug("before bulk [{}] [actions={}] [bytes={}] [requests={}]",
|
||||
executionId,
|
||||
request.numberOfActions(),
|
||||
request.estimatedSizeInBytes(),
|
||||
|
@ -62,20 +64,11 @@ public class DefaultBulkListener implements BulkListener {
|
|||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
bulkMetric.recalculate(request, response);
|
||||
long l = bulkMetric.getCurrentIngest().getCount();
|
||||
bulkMetric.getCurrentIngest().dec();
|
||||
bulkMetric.getSucceeded().inc(response.getItems().length);
|
||||
bulkMetric.markTotalIngest(response.getItems().length);
|
||||
if (responseTimeCount > 0 && responseTimes.add(response.getTook().millis()) == 0) {
|
||||
LongSummaryStatistics stat = responseTimes.longStream().summaryStatistics();
|
||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||
logger.debug("bulk response millis: avg = " + stat.getAverage() +
|
||||
" min =" + stat.getMin() +
|
||||
" max = " + stat.getMax() +
|
||||
" actions = " + bulkController.getBulkProcessor().getBulkActions() +
|
||||
" size = " + bulkController.getBulkProcessor().getBulkSize());
|
||||
}
|
||||
}
|
||||
int n = 0;
|
||||
for (BulkItemResponse itemResponse : response.getItems()) {
|
||||
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
|
||||
|
@ -86,7 +79,7 @@ public class DefaultBulkListener implements BulkListener {
|
|||
}
|
||||
}
|
||||
if (isBulkLoggingEnabled && logger.isDebugEnabled()) {
|
||||
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]",
|
||||
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [requests={}]",
|
||||
executionId,
|
||||
bulkMetric.getSucceeded().getCount(),
|
||||
bulkMetric.getFailed().getCount(),
|
||||
|
@ -114,7 +107,7 @@ public class DefaultBulkListener implements BulkListener {
|
|||
if (logger.isErrorEnabled()) {
|
||||
logger.error("after bulk [" + executionId + "] error", failure);
|
||||
}
|
||||
bulkController.inactivate();
|
||||
bulkProcessor.setEnabled(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -122,29 +115,8 @@ public class DefaultBulkListener implements BulkListener {
|
|||
return lastBulkError;
|
||||
}
|
||||
|
||||
private static class LastResponseTimes {
|
||||
|
||||
private final Long[] values;
|
||||
|
||||
private final int limit;
|
||||
|
||||
private int index;
|
||||
|
||||
public LastResponseTimes(int limit) {
|
||||
this.values = new Long[limit];
|
||||
Arrays.fill(values, -1L);
|
||||
this.limit = limit;
|
||||
this.index = 0;
|
||||
}
|
||||
|
||||
public int add(Long value) {
|
||||
int i = index++ % limit;
|
||||
values[i] = value;
|
||||
return i;
|
||||
}
|
||||
|
||||
public LongStream longStream() {
|
||||
return Arrays.stream(values).filter(v -> v != -1L).mapToLong(Long::longValue);
|
||||
}
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
bulkMetric.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,33 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
import org.xbib.metrics.common.CountMetric;
|
||||
import org.xbib.metrics.common.Meter;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.io.IOException;
|
||||
import java.util.LongSummaryStatistics;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DefaultBulkMetric implements BulkMetric {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(DefaultBulkMetric.class.getName());
|
||||
|
||||
private final DefaultBulkProcessor bulkProcessor;
|
||||
|
||||
private final ScheduledFuture<?> future;
|
||||
|
||||
private final Meter totalIngest;
|
||||
|
||||
private final Count totalIngestSizeInBytes;
|
||||
|
@ -25,23 +42,58 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
|
||||
private final Count failed;
|
||||
|
||||
private final long measureIntervalSeconds;
|
||||
|
||||
private final int ringBufferSize;
|
||||
|
||||
private final LongRingBuffer ringBuffer;
|
||||
|
||||
private final long minVolumePerRequest;
|
||||
|
||||
private long maxVolumePerRequest;
|
||||
|
||||
private Long started;
|
||||
|
||||
private Long stopped;
|
||||
|
||||
public DefaultBulkMetric() {
|
||||
totalIngest = new Meter(Executors.newSingleThreadScheduledExecutor());
|
||||
totalIngestSizeInBytes = new CountMetric();
|
||||
currentIngest = new CountMetric();
|
||||
currentIngestNumDocs = new CountMetric();
|
||||
submitted = new CountMetric();
|
||||
succeeded = new CountMetric();
|
||||
failed = new CountMetric();
|
||||
}
|
||||
private Double lastThroughput;
|
||||
|
||||
@Override
|
||||
public void init(Settings settings) {
|
||||
start();
|
||||
private long currentVolumePerRequest;
|
||||
|
||||
private int x = 0;
|
||||
|
||||
public DefaultBulkMetric(DefaultBulkProcessor bulkProcessor,
|
||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
||||
Settings settings) {
|
||||
this.bulkProcessor = bulkProcessor;
|
||||
int ringBufferSize = settings.getAsInt(Parameters.BULK_RING_BUFFER_SIZE.getName(),
|
||||
Parameters.BULK_RING_BUFFER_SIZE.getInteger());
|
||||
String measureIntervalStr = settings.get(Parameters.BULK_MEASURE_INTERVAL.getName(),
|
||||
Parameters.BULK_MEASURE_INTERVAL.getString());
|
||||
TimeValue measureInterval = TimeValue.parseTimeValue(measureIntervalStr,
|
||||
TimeValue.timeValueSeconds(1), "");
|
||||
this.measureIntervalSeconds = measureInterval.seconds();
|
||||
this.totalIngest = new Meter(scheduledThreadPoolExecutor);
|
||||
this.ringBufferSize = ringBufferSize;
|
||||
this.ringBuffer = new LongRingBuffer(ringBufferSize);
|
||||
this.totalIngestSizeInBytes = new CountMetric();
|
||||
this.currentIngest = new CountMetric();
|
||||
this.currentIngestNumDocs = new CountMetric();
|
||||
this.submitted = new CountMetric();
|
||||
this.succeeded = new CountMetric();
|
||||
this.failed = new CountMetric();
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
this.minVolumePerRequest = minVolumePerRequest.getBytes();
|
||||
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MAX_VOLUME_PER_REQUEST.getString(), "1m"));
|
||||
this.maxVolumePerRequest = maxVolumePerRequest.getBytes();
|
||||
this.currentVolumePerRequest = minVolumePerRequest.getBytes();
|
||||
String metricLogIntervalStr = settings.get(Parameters.BULK_METRIC_LOG_INTERVAL.getName(),
|
||||
Parameters.BULK_METRIC_LOG_INTERVAL.getString());
|
||||
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||
TimeValue.timeValueSeconds(10), "");
|
||||
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,7 +144,7 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
@Override
|
||||
public void start() {
|
||||
this.started = System.nanoTime();
|
||||
totalIngest.start(5L);
|
||||
totalIngest.start(measureIntervalSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,8 +154,87 @@ public class DefaultBulkMetric implements BulkMetric {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
public void close() throws IOException {
|
||||
stop();
|
||||
totalIngest.shutdown();
|
||||
log();
|
||||
this.future.cancel(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recalculate(BulkRequest request, BulkResponse response) {
|
||||
if (ringBufferSize > 0 && ringBuffer.add(response.getTook().millis(), request.estimatedSizeInBytes()) == 0) {
|
||||
x++;
|
||||
LongSummaryStatistics stat1 = ringBuffer.longStreamValues1().summaryStatistics();
|
||||
LongSummaryStatistics stat2 = ringBuffer.longStreamValues2().summaryStatistics();
|
||||
double throughput = stat2.getAverage() / stat1.getAverage();
|
||||
double delta = lastThroughput != null ? throughput - lastThroughput : 0.0d;
|
||||
double deltaPercent = delta * 100 / throughput;
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("time: avg = " + stat1.getAverage() +
|
||||
" min = " + stat1.getMin() +
|
||||
" max = " + stat1.getMax() +
|
||||
" size: avg = " + stat2.getAverage() +
|
||||
" min = " + stat2.getMin() +
|
||||
" max = " + stat2.getMax() +
|
||||
" last throughput: " + lastThroughput + " bytes/ms" +
|
||||
" throughput: " + throughput + " bytes/ms" +
|
||||
" delta = " + delta +
|
||||
" deltapercent = " + deltaPercent +
|
||||
" vol = " + currentVolumePerRequest);
|
||||
}
|
||||
if ((lastThroughput == null || throughput < 1000000) && stat1.getAverage() < 5000) {
|
||||
double k = 0.5;
|
||||
double d = (1 / (1 + Math.exp(-(((double)x)) * k)));
|
||||
currentVolumePerRequest += d * currentVolumePerRequest;
|
||||
if (currentVolumePerRequest > maxVolumePerRequest) {
|
||||
currentVolumePerRequest = maxVolumePerRequest;
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("increasing volume to " + currentVolumePerRequest + " max volume = " + maxVolumePerRequest);
|
||||
}
|
||||
}
|
||||
bulkProcessor.setMaxBulkVolume(currentVolumePerRequest);
|
||||
} else if (stat1.getAverage() >= 5000) {
|
||||
if (currentVolumePerRequest == maxVolumePerRequest) {
|
||||
// subtract 10% from max
|
||||
this.maxVolumePerRequest -= (maxVolumePerRequest / 10);
|
||||
if (maxVolumePerRequest < 1024) {
|
||||
maxVolumePerRequest = 1024;
|
||||
}
|
||||
}
|
||||
// fall back to minimal volume
|
||||
currentVolumePerRequest = minVolumePerRequest;
|
||||
bulkProcessor.setMaxBulkVolume(currentVolumePerRequest);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("decreasing volume to " + currentVolumePerRequest + " new max volume = " + maxVolumePerRequest);
|
||||
}
|
||||
}
|
||||
lastThroughput = throughput;
|
||||
}
|
||||
}
|
||||
|
||||
private void log() {
|
||||
long docs = getSucceeded().getCount();
|
||||
long elapsed = elapsed() / 1000000; // nano to millis
|
||||
double dps = docs * 1000.0 / elapsed;
|
||||
long bytes = getTotalIngestSizeInBytes().getCount();
|
||||
double avg = bytes / (docs + 1.0); // avoid div by zero
|
||||
double bps = bytes * 1000.0 / elapsed;
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.log(Level.INFO, "{} docs, {} ms = {}, {} = {}, {} = {} avg, {} = {}, {} = {}",
|
||||
docs,
|
||||
elapsed,
|
||||
FormatUtil.formatDurationWords(elapsed, true, true),
|
||||
bytes,
|
||||
FormatUtil.formatSize(bytes),
|
||||
avg,
|
||||
FormatUtil.formatSize(avg),
|
||||
dps,
|
||||
FormatUtil.formatDocumentSpeed(dps),
|
||||
bps,
|
||||
FormatUtil.formatSpeed(bps)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
|
@ -7,21 +9,18 @@ import org.elasticsearch.action.bulk.BulkRequest;
|
|||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.xbib.elx.api.BulkListener;
|
||||
import org.xbib.elx.api.BulkClient;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.elx.api.BulkProcessor;
|
||||
import org.xbib.elx.api.BulkRequestHandler;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
|
@ -30,367 +29,215 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
* (either based on number of actions, based on the size, or time), and
|
||||
* to easily control the number of concurrent bulk
|
||||
* requests allowed to be executed in parallel.
|
||||
* In order to create a new bulk processor, use the {@link Builder}.
|
||||
*/
|
||||
public class DefaultBulkProcessor implements BulkProcessor {
|
||||
|
||||
private final ScheduledThreadPoolExecutor scheduler;
|
||||
private static final Logger logger = LogManager.getLogger(DefaultBulkProcessor.class);
|
||||
|
||||
private final ScheduledFuture<?> scheduledFuture;
|
||||
private final BulkClient bulkClient;
|
||||
|
||||
private final AtomicLong executionIdGen;
|
||||
private final AtomicBoolean enabled;
|
||||
|
||||
private final BulkRequestHandler bulkRequestHandler;
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final DefaultBulkListener bulkListener;
|
||||
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
|
||||
private BulkRequest bulkRequest;
|
||||
|
||||
private long bulkVolume;
|
||||
|
||||
private int bulkActions;
|
||||
|
||||
private long bulkSize;
|
||||
private final AtomicBoolean closed;
|
||||
|
||||
private volatile boolean closed;
|
||||
private final AtomicLong executionIdGen;
|
||||
|
||||
private DefaultBulkProcessor(ElasticsearchClient client,
|
||||
BulkListener bulkListener,
|
||||
String name,
|
||||
int concurrentRequests,
|
||||
int bulkActions,
|
||||
ByteSizeValue bulkSize,
|
||||
TimeValue flushInterval) {
|
||||
this.executionIdGen = new AtomicLong();
|
||||
this.closed = false;
|
||||
this.bulkActions = bulkActions;
|
||||
this.bulkSize = bulkSize.getBytes();
|
||||
this.bulkRequest = new BulkRequest();
|
||||
this.bulkRequestHandler = concurrentRequests == 0 ?
|
||||
new SyncBulkRequestHandler(client, bulkListener) :
|
||||
new AsyncBulkRequestHandler(client, bulkListener, concurrentRequests);
|
||||
if (flushInterval != null) {
|
||||
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
|
||||
EsExecutors.daemonThreadFactory(Settings.EMPTY,
|
||||
name != null ? "[" + name + "]" : "" + "bulk_processor"));
|
||||
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(),
|
||||
private final ResizeableSemaphore semaphore;
|
||||
|
||||
private final int permits;
|
||||
|
||||
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
|
||||
this.bulkClient = bulkClient;
|
||||
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
|
||||
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
|
||||
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
|
||||
Parameters.BULK_FLUSH_INTERVAL.getString());
|
||||
TimeValue flushInterval = TimeValue.parseTimeValue(flushIntervalStr,
|
||||
TimeValue.timeValueSeconds(30), "");
|
||||
ByteSizeValue minVolumePerRequest = settings.getAsBytesSize(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getName(),
|
||||
ByteSizeValue.parseBytesSizeValue(Parameters.BULK_MIN_VOLUME_PER_REQUEST.getString(), "1k"));
|
||||
this.client = bulkClient.getClient();
|
||||
if (flushInterval.millis() > 0L) {
|
||||
this.scheduledFuture = bulkClient.getScheduler().scheduleWithFixedDelay(this::flush, flushInterval.millis(),
|
||||
flushInterval.millis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
this.bulkListener = new DefaultBulkListener(this, bulkClient.getScheduler(), settings);
|
||||
this.bulkActions = maxActionsPerRequest;
|
||||
this.bulkVolume = minVolumePerRequest.getBytes();
|
||||
this.bulkRequest = new BulkRequest();
|
||||
this.closed = new AtomicBoolean(false);
|
||||
this.enabled = new AtomicBoolean(false);
|
||||
this.executionIdGen = new AtomicLong();
|
||||
this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger());
|
||||
if (permits < 1) {
|
||||
throw new IllegalArgumentException("must not be less 1 permits for bulk indexing");
|
||||
}
|
||||
this.semaphore = new ResizeableSemaphore(permits);
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("bulk processor now active");
|
||||
}
|
||||
setEnabled(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled.set(enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
|
||||
String indexName = indexDefinition.getFullIndexName();
|
||||
int interval = indexDefinition.getStartBulkRefreshSeconds();
|
||||
if (interval != 0) {
|
||||
logger.info("starting bulk on " + indexName + " with new refresh interval " + interval);
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
||||
} else {
|
||||
this.scheduler = null;
|
||||
this.scheduledFuture = null;
|
||||
logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval);
|
||||
}
|
||||
}
|
||||
|
||||
public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) {
|
||||
Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null");
|
||||
return new Builder(client, bulkListener);
|
||||
@Override
|
||||
public void stopBulkMode(IndexDefinition indexDefinition) throws IOException {
|
||||
String indexName = indexDefinition.getFullIndexName();
|
||||
int interval = indexDefinition.getStopBulkRefreshSeconds();
|
||||
flush();
|
||||
if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
|
||||
if (interval != 0) {
|
||||
logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval);
|
||||
bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS);
|
||||
} else {
|
||||
logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBulkActions(int bulkActions) {
|
||||
public void setMaxBulkActions(int bulkActions) {
|
||||
this.bulkActions = bulkActions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBulkActions() {
|
||||
public int getMaxBulkActions() {
|
||||
return bulkActions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBulkSize(long bulkSize) {
|
||||
this.bulkSize = bulkSize;
|
||||
public void setMaxBulkVolume(long bulkVolume) {
|
||||
this.bulkVolume = bulkVolume;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBulkSize() {
|
||||
return bulkSize;
|
||||
public long getMaxBulkVolume() {
|
||||
return bulkVolume;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for bulk request handler with flush.
|
||||
* @param timeout the timeout value
|
||||
* @param unit the timeout unit
|
||||
* @return true is method was successful, false if timeout
|
||||
* @throws InterruptedException if timeout
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
Objects.requireNonNull(unit, "A time unit is required for awaitFlush() but null");
|
||||
if (closed) {
|
||||
return true;
|
||||
}
|
||||
// flush
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
execute();
|
||||
}
|
||||
// wait for all bulk responses
|
||||
return bulkRequestHandler.close(timeout, unit);
|
||||
public BulkMetric getBulkMetric() {
|
||||
return bulkListener.getBulkMetric();
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the processor. Any remaining bulk actions are flushed and then closed. This emthod can only be called
|
||||
* once as the last action of a bulk processor.
|
||||
*
|
||||
* If concurrent requests are not enabled, returns {@code true} immediately.
|
||||
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then
|
||||
* returns {@code true},
|
||||
* If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
|
||||
*
|
||||
* @param timeout The maximum time to wait for the bulk requests to complete
|
||||
* @param unit The time unit of the {@code timeout} argument
|
||||
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the
|
||||
* bulk requests completed
|
||||
* @throws InterruptedException If the current thread is interrupted
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
Objects.requireNonNull(unit, "A time unit is required for awaitCLose() but null");
|
||||
if (closed) {
|
||||
return true;
|
||||
}
|
||||
closed = true;
|
||||
if (scheduledFuture != null) {
|
||||
FutureUtils.cancel(scheduledFuture);
|
||||
scheduler.shutdown();
|
||||
}
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
execute();
|
||||
}
|
||||
return bulkRequestHandler.close(timeout, unit);
|
||||
public Throwable getLastBulkError() {
|
||||
return bulkListener.getLastBulkError();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds either a delete or an index request.
|
||||
*
|
||||
* @param request request
|
||||
* @return his bulk processor
|
||||
*/
|
||||
@Override
|
||||
public synchronized DefaultBulkProcessor add(DocWriteRequest<?> request) {
|
||||
ensureOpen();
|
||||
public synchronized void add(DocWriteRequest<?> request) {
|
||||
ensureOpenAndActive();
|
||||
bulkRequest.add(request);
|
||||
if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) ||
|
||||
(bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) {
|
||||
(bulkVolume != -1 && bulkRequest.estimatedSizeInBytes() >= bulkVolume)) {
|
||||
execute();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush pending delete or index requests.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void flush() {
|
||||
ensureOpen();
|
||||
ensureOpenAndActive();
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
execute();
|
||||
}
|
||||
// do not drain semaphore
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
|
||||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
public synchronized boolean waitForBulkResponses(long timeout, TimeUnit unit) {
|
||||
try {
|
||||
// 0 = immediate close
|
||||
awaitClose(0, TimeUnit.NANOSECONDS);
|
||||
if (closed.get()) {
|
||||
// silently skip closed condition
|
||||
return true;
|
||||
}
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
execute();
|
||||
}
|
||||
return drainSemaphore(timeout, unit);
|
||||
|
||||
} catch (InterruptedException exc) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("interrupted while waiting for bulk responses");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureOpen() {
|
||||
if (closed) {
|
||||
throw new IllegalStateException("bulk processor already closed");
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
try {
|
||||
if (scheduledFuture != null) {
|
||||
scheduledFuture.cancel(true);
|
||||
}
|
||||
// like flush but without ensuring open
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
execute();
|
||||
}
|
||||
drainSemaphore(0L, TimeUnit.NANOSECONDS);
|
||||
bulkListener.close();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void execute() {
|
||||
BulkRequest myBulkRequest = this.bulkRequest;
|
||||
long executionId = executionIdGen.incrementAndGet();
|
||||
this.bulkRequest = new BulkRequest();
|
||||
this.bulkRequestHandler.execute(myBulkRequest, executionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* A builder used to create a build an instance of a bulk processor.
|
||||
*/
|
||||
public static class Builder {
|
||||
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final BulkListener bulkListener;
|
||||
|
||||
private String name;
|
||||
|
||||
private int concurrentRequests = 1;
|
||||
|
||||
private int bulkActions = 1000;
|
||||
|
||||
private ByteSizeValue bulkSize = new ByteSizeValue(10, ByteSizeUnit.MB);
|
||||
|
||||
private TimeValue flushInterval = null;
|
||||
|
||||
/**
|
||||
* Creates a builder of bulk processor with the client to use and the listener that will be used
|
||||
* to be notified on the completion of bulk requests.
|
||||
*
|
||||
* @param client the client
|
||||
* @param bulkListener the listener
|
||||
*/
|
||||
Builder(ElasticsearchClient client, BulkListener bulkListener) {
|
||||
this.client = client;
|
||||
this.bulkListener = bulkListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets an optional name to identify this bulk processor.
|
||||
*
|
||||
* @param name name
|
||||
* @return this builder
|
||||
*/
|
||||
public Builder setName(String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
|
||||
* request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
|
||||
* while accumulating new bulk requests. Defaults to {@code 1}.
|
||||
*
|
||||
* @param concurrentRequests maximum number of concurrent requests
|
||||
* @return this builder
|
||||
*/
|
||||
public Builder setConcurrentRequests(int concurrentRequests) {
|
||||
this.concurrentRequests = concurrentRequests;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets when to flush a new bulk request based on the number of actions currently added. Defaults to
|
||||
* {@code 1000}. Can be set to {@code -1} to disable it.
|
||||
*
|
||||
* @param bulkActions bulk actions
|
||||
* @return this builder
|
||||
*/
|
||||
public Builder setBulkActions(int bulkActions) {
|
||||
this.bulkActions = bulkActions;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
|
||||
* {@code 5mb}. Can be set to {@code -1} to disable it.
|
||||
*
|
||||
* @param bulkSize bulk size
|
||||
* @return this builder
|
||||
*/
|
||||
public Builder setBulkSize(ByteSizeValue bulkSize) {
|
||||
this.bulkSize = bulkSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
|
||||
* Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
|
||||
* can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions.
|
||||
*
|
||||
* @param flushInterval flush interval
|
||||
* @return this builder
|
||||
*/
|
||||
public Builder setFlushInterval(TimeValue flushInterval) {
|
||||
this.flushInterval = flushInterval;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a new bulk processor.
|
||||
*
|
||||
* @return a bulk processor
|
||||
*/
|
||||
public DefaultBulkProcessor build() {
|
||||
return new DefaultBulkProcessor(client, bulkListener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
|
||||
}
|
||||
}
|
||||
|
||||
private class Flush implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultBulkProcessor.this) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
if (bulkRequest.numberOfActions() == 0) {
|
||||
return;
|
||||
}
|
||||
execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class SyncBulkRequestHandler implements BulkRequestHandler {
|
||||
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final BulkListener bulkListener;
|
||||
|
||||
SyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener) {
|
||||
this.client = client;
|
||||
this.bulkListener = bulkListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(BulkRequest bulkRequest, long executionId) {
|
||||
long executionId = executionIdGen.incrementAndGet();
|
||||
if (semaphore == null) {
|
||||
boolean afterCalled = false;
|
||||
try {
|
||||
bulkListener.beforeBulk(executionId, bulkRequest);
|
||||
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
|
||||
bulkListener.beforeBulk(executionId, myBulkRequest);
|
||||
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, myBulkRequest).actionGet();
|
||||
afterCalled = true;
|
||||
bulkListener.afterBulk(executionId, bulkRequest, bulkResponse);
|
||||
bulkListener.afterBulk(executionId, myBulkRequest, bulkResponse);
|
||||
} catch (Exception e) {
|
||||
if (!afterCalled) {
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean close(long timeout, TimeUnit unit) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static class AsyncBulkRequestHandler implements BulkRequestHandler {
|
||||
|
||||
private final ElasticsearchClient client;
|
||||
|
||||
private final BulkListener bulkListener;
|
||||
|
||||
private final Semaphore semaphore;
|
||||
|
||||
private final int concurrentRequests;
|
||||
|
||||
private AsyncBulkRequestHandler(ElasticsearchClient client, BulkListener bulkListener, int concurrentRequests) {
|
||||
this.client = client;
|
||||
this.bulkListener = bulkListener;
|
||||
this.concurrentRequests = concurrentRequests;
|
||||
this.semaphore = new Semaphore(concurrentRequests);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(BulkRequest bulkRequest, long executionId) {
|
||||
} else {
|
||||
boolean bulkRequestSetupSuccessful = false;
|
||||
boolean acquired = false;
|
||||
try {
|
||||
bulkListener.beforeBulk(executionId, bulkRequest);
|
||||
bulkListener.beforeBulk(executionId, myBulkRequest);
|
||||
semaphore.acquire();
|
||||
acquired = true;
|
||||
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
|
||||
client.execute(BulkAction.INSTANCE, myBulkRequest, new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
try {
|
||||
bulkListener.afterBulk(executionId, bulkRequest, response);
|
||||
bulkListener.afterBulk(executionId, myBulkRequest, response);
|
||||
} finally {
|
||||
semaphore.release();
|
||||
}
|
||||
|
@ -399,7 +246,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
||||
} finally {
|
||||
semaphore.release();
|
||||
}
|
||||
|
@ -408,24 +255,50 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
bulkRequestSetupSuccessful = true;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
||||
} catch (Exception e) {
|
||||
bulkListener.afterBulk(executionId, bulkRequest, e);
|
||||
bulkListener.afterBulk(executionId, myBulkRequest, e);
|
||||
} finally {
|
||||
if (!bulkRequestSetupSuccessful && acquired) {
|
||||
// if we fail on client.bulk() release the semaphore
|
||||
semaphore.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean drainSemaphore(long timeValue, TimeUnit timeUnit) throws InterruptedException {
|
||||
if (semaphore != null) {
|
||||
if (permits <= 0) {
|
||||
return true;
|
||||
} else {
|
||||
if (semaphore.tryAcquire(permits, timeValue, timeUnit)) {
|
||||
semaphore.release(permits);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void ensureOpenAndActive() {
|
||||
if (closed.get()) {
|
||||
throw new IllegalStateException("bulk processor is closed");
|
||||
}
|
||||
if (!enabled.get()) {
|
||||
throw new IllegalStateException("bulk processor is no longer enabled");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
private static class ResizeableSemaphore extends Semaphore {
|
||||
|
||||
ResizeableSemaphore(int permits) {
|
||||
super(permits, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean close(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) {
|
||||
semaphore.release(concurrentRequests);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
protected void reducePermits(int reduction) {
|
||||
super.reducePermits(reduction);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()));
|
||||
setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$"));
|
||||
setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now()));
|
||||
setMaxWaitTime(Parameters.MAX_WAIT_BULK_RESPONSE_SECONDS.getInteger(), TimeUnit.SECONDS);
|
||||
setMaxWaitTime(30, TimeUnit.SECONDS);
|
||||
setShift(false);
|
||||
setPrune(false);
|
||||
setEnabled(true);
|
||||
|
@ -76,7 +76,9 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
|
||||
public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings)
|
||||
throws IOException {
|
||||
TimeValue timeValue = settings.getAsTime(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), TimeValue.timeValueSeconds(30));
|
||||
String timeValueStr = settings.get(Parameters.BULK_MAX_WAIT_RESPONSE.getName(),
|
||||
Parameters.BULK_MAX_WAIT_RESPONSE.getString());
|
||||
TimeValue timeValue = TimeValue.parseTimeValue(timeValueStr, TimeValue.timeValueSeconds(30), "");
|
||||
setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS);
|
||||
String indexName = settings.get("name", index);
|
||||
String indexType = settings.get("type", type);
|
||||
|
@ -86,11 +88,13 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
setEnabled(enabled);
|
||||
String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName);
|
||||
setFullIndexName(fullIndexName);
|
||||
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_START_REFRESH_SECONDS.getName(),
|
||||
Parameters.BULK_START_REFRESH_SECONDS.getInteger()));
|
||||
setStopBulkRefreshSeconds(settings.getAsInt(Parameters.BULK_STOP_REFRESH_SECONDS.getName(),
|
||||
Parameters.BULK_STOP_REFRESH_SECONDS.getInteger()));
|
||||
if (settings.get("settings") != null && settings.get("mapping") != null) {
|
||||
setSettings(findSettingsFrom(settings.get("settings")));
|
||||
setMappings(findMappingsFrom(settings.get("mapping")));
|
||||
setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1));
|
||||
setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1));
|
||||
setReplicaLevel(settings.getAsInt("replica", 0));
|
||||
boolean shift = settings.getAsBoolean("shift", false);
|
||||
setShift(shift);
|
||||
|
@ -100,7 +104,7 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault())
|
||||
.withZone(ZoneId.systemDefault());
|
||||
setDateTimeFormatter(dateTimeFormatter);
|
||||
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$");
|
||||
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\d+)$");
|
||||
Pattern dateTimePattern = Pattern.compile(dateTimePatternStr);
|
||||
setDateTimePattern(dateTimePattern);
|
||||
String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now());
|
||||
|
@ -195,20 +199,24 @@ public class DefaultIndexDefinition implements IndexDefinition {
|
|||
return pattern;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexDefinition setStartBulkRefreshSeconds(int seconds) {
|
||||
this.startRefreshInterval = seconds;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStartBulkRefreshSeconds() {
|
||||
return startRefreshInterval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexDefinition setStopBulkRefreshSeconds(int seconds) {
|
||||
this.stopRefreshInterval = seconds;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStopBulkRefreshSeconds() {
|
||||
return stopRefreshInterval;
|
||||
}
|
||||
|
|
|
@ -1,15 +1,24 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.xbib.elx.api.SearchMetric;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
import org.xbib.metrics.common.CountMetric;
|
||||
import org.xbib.metrics.common.Meter;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DefaultSearchMetric implements SearchMetric {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(DefaultSearchMetric.class.getName());
|
||||
|
||||
private final ScheduledFuture<?> future;
|
||||
|
||||
private final Meter totalQuery;
|
||||
|
||||
private final Count currentQuery;
|
||||
|
@ -28,14 +37,20 @@ public class DefaultSearchMetric implements SearchMetric {
|
|||
|
||||
private Long stopped;
|
||||
|
||||
public DefaultSearchMetric() {
|
||||
totalQuery = new Meter(Executors.newSingleThreadScheduledExecutor());
|
||||
public DefaultSearchMetric(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
|
||||
Settings settings) {
|
||||
totalQuery = new Meter(scheduledThreadPoolExecutor);
|
||||
currentQuery = new CountMetric();
|
||||
queries = new CountMetric();
|
||||
succeededQueries = new CountMetric();
|
||||
emptyQueries = new CountMetric();
|
||||
failedQueries = new CountMetric();
|
||||
timeoutQueries = new CountMetric();
|
||||
String metricLogIntervalStr = settings.get(Parameters.SEARCH_METRIC_LOG_INTERVAL.getName(),
|
||||
Parameters.SEARCH_METRIC_LOG_INTERVAL.getString());
|
||||
TimeValue metricLoginterval = TimeValue.parseTimeValue(metricLogIntervalStr,
|
||||
TimeValue.timeValueSeconds(10), "");
|
||||
this.future = scheduledThreadPoolExecutor.scheduleAtFixedRate(this::log, 0L, metricLoginterval.seconds(), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,6 +113,8 @@ public class DefaultSearchMetric implements SearchMetric {
|
|||
public void stop() {
|
||||
this.stopped = System.nanoTime();
|
||||
totalQuery.stop();
|
||||
log();
|
||||
this.future.cancel(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,4 +122,10 @@ public class DefaultSearchMetric implements SearchMetric {
|
|||
stop();
|
||||
totalQuery.shutdown();
|
||||
}
|
||||
|
||||
private void log() {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("docs = " + getTotalQueries().getCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
436
elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java
Normal file
436
elx-common/src/main/java/org/xbib/elx/common/FormatUtil.java
Normal file
|
@ -0,0 +1,436 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import org.xbib.time.pretty.PrettyTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Taken from org,apache.commons.lang.DurationFormatUtils of Apache commons-lang.
|
||||
*/
|
||||
public class FormatUtil {
|
||||
|
||||
private static final PrettyTime pretty = new PrettyTime();
|
||||
|
||||
private static final String EMPTY = "";
|
||||
private static final String YEAR = "y";
|
||||
private static final String MONTH = "M";
|
||||
private static final String DAY = "d";
|
||||
private static final String HOUR = "H";
|
||||
private static final String MINUTE = "m";
|
||||
private static final String SECOND = "s";
|
||||
private static final String MILLISECOND = "S";
|
||||
|
||||
/**
|
||||
* Number of milliseconds in a standard second.
|
||||
*/
|
||||
private static final long MILLIS_PER_SECOND = 1000;
|
||||
/**
|
||||
* Number of milliseconds in a standard minute.
|
||||
*/
|
||||
private static final long MILLIS_PER_MINUTE = 60 * MILLIS_PER_SECOND;
|
||||
/**
|
||||
* Number of milliseconds in a standard hour.
|
||||
*/
|
||||
private static final long MILLIS_PER_HOUR = 60 * MILLIS_PER_MINUTE;
|
||||
/**
|
||||
* Number of milliseconds in a standard day.
|
||||
*/
|
||||
private static final long MILLIS_PER_DAY = 24 * MILLIS_PER_HOUR;
|
||||
|
||||
private static final String[] BYTES = {
|
||||
" B", " kB", " MB", " GB", " TB", " PB", " EB", " ZB", " YB"
|
||||
};
|
||||
private static final String[] BYTES_PER_SECOND = {
|
||||
" B/s", " kB/s", " MB/s", " GB/s", " TB/s", " PB/s", " EB/s", " ZB/s", " YB/s"
|
||||
};
|
||||
private static final String[] DOCS_PER_SECOND = {
|
||||
" dps", " kdps", " Mdps", " Gdps", " Tdps", " Pdps", " Edps", " Zdps", " Ydps"
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Format byte size (file size as example) into a string,
|
||||
* with two digits after dot and actual measure (MB, GB or other).
|
||||
*
|
||||
* @param size value to format
|
||||
* @return formatted string in bytes, kB, MB or other.
|
||||
*/
|
||||
public static String formatSize(long size) {
|
||||
return format(size, BYTES, 1024);
|
||||
}
|
||||
|
||||
public static String formatSize(double size) {
|
||||
return format(size, BYTES, 1024);
|
||||
}
|
||||
|
||||
/**
|
||||
* Format speed values (copy speed as example) into a string
|
||||
* with two digits after dot and actual measure (MB/s, GB/s or other).
|
||||
*
|
||||
* @param speed value to format
|
||||
* @return formatted string in bytes/s, kB/s, MB/s or other.
|
||||
*/
|
||||
public static String formatSpeed(long speed) {
|
||||
return format(speed, BYTES_PER_SECOND, 1024);
|
||||
}
|
||||
|
||||
public static String formatSpeed(double speed) {
|
||||
return format(speed, BYTES_PER_SECOND, 1024);
|
||||
}
|
||||
|
||||
public static String formatDocumentSpeed(long speed) {
|
||||
return format(speed, DOCS_PER_SECOND, 1024);
|
||||
}
|
||||
|
||||
public static String formatDocumentSpeed(double speed) {
|
||||
return format(speed, DOCS_PER_SECOND, 1024);
|
||||
}
|
||||
|
||||
/**
|
||||
* Format any value without string appending.
|
||||
*
|
||||
* @param size value to format
|
||||
* @param measureUnits array of strings to use as measurement units. Use BYTES_PER_SECOND as example.
|
||||
* @param measureQuantity quantiry, required to step into next unit. Like 1024 for bytes,
|
||||
* 1000 for meters or 100 for century.
|
||||
* @return formatted size with measure unit
|
||||
*/
|
||||
private static String format(long size, String[] measureUnits, int measureQuantity) {
|
||||
if (size <= 0) {
|
||||
return null;
|
||||
}
|
||||
if (size < measureQuantity) {
|
||||
return size + measureUnits[0];
|
||||
}
|
||||
int i = 1;
|
||||
double d = size;
|
||||
while ((d = d / measureQuantity) > (measureQuantity - 1)) {
|
||||
i++;
|
||||
}
|
||||
long l = (long) (d * 100);
|
||||
d = (double) l / 100;
|
||||
if (i < measureUnits.length) {
|
||||
return d + measureUnits[i];
|
||||
}
|
||||
return String.valueOf(size);
|
||||
}
|
||||
|
||||
private static String format(double value, String[] measureUnits, int measureQuantity) {
|
||||
double d = value;
|
||||
if (d <= 0.0d) {
|
||||
return null;
|
||||
}
|
||||
if (d < measureQuantity) {
|
||||
return d + measureUnits[0];
|
||||
}
|
||||
int i = 1;
|
||||
while ((d = d / measureQuantity) > (measureQuantity - 1)) {
|
||||
i++;
|
||||
}
|
||||
long l = (long) (d * 100);
|
||||
d = (double) l / 100;
|
||||
if (i < measureUnits.length) {
|
||||
return d + measureUnits[i];
|
||||
}
|
||||
return String.valueOf(d);
|
||||
}
|
||||
|
||||
public static String formatMillis(long millis) {
|
||||
return pretty.format(pretty.calculateDuration(millis));
|
||||
}
|
||||
|
||||
public static String formatDurationWords(long value, boolean suppressLeadingZeroElements,
|
||||
boolean suppressTrailingZeroElements) {
|
||||
// This method is generally replacable by the format method, but
|
||||
// there are a series of tweaks and special cases that require
|
||||
// trickery to replicate.
|
||||
String duration = formatDuration(value, "d' days 'H' hours 'm' minutes 's' seconds'");
|
||||
if (suppressLeadingZeroElements) {
|
||||
// this is a temporary marker on the front. Like ^ in regexp.
|
||||
duration = " " + duration;
|
||||
String tmp = replaceOnce(duration, " 0 days", "");
|
||||
if (tmp.length() != duration.length()) {
|
||||
duration = tmp;
|
||||
tmp = replaceOnce(duration, " 0 hours", "");
|
||||
if (tmp.length() != duration.length()) {
|
||||
duration = tmp;
|
||||
tmp = replaceOnce(duration, " 0 minutes", "");
|
||||
if (tmp.length() != duration.length()) {
|
||||
duration = replaceOnce(tmp, " 0 seconds", "");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (duration.length() != 0) {
|
||||
// strip the space off again
|
||||
duration = duration.substring(1);
|
||||
}
|
||||
}
|
||||
if (suppressTrailingZeroElements) {
|
||||
String tmp = replaceOnce(duration, " 0 seconds", "");
|
||||
if (tmp != null && tmp.length() != duration.length()) {
|
||||
duration = tmp;
|
||||
tmp = replaceOnce(duration, " 0 minutes", "");
|
||||
if (tmp.length() != duration.length()) {
|
||||
duration = tmp;
|
||||
tmp = replaceOnce(duration, " 0 hours", "");
|
||||
if (tmp.length() != duration.length()) {
|
||||
duration = replaceOnce(tmp, " 0 days", "");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
duration = " " + duration;
|
||||
duration = replaceOnce(duration, " 1 seconds", " 1 second");
|
||||
duration = replaceOnce(duration, " 1 minutes", " 1 minute");
|
||||
duration = replaceOnce(duration, " 1 hours", " 1 hour");
|
||||
duration = replaceOnce(duration, " 1 days", " 1 day");
|
||||
return duration != null ? duration.trim() : null;
|
||||
}
|
||||
|
||||
public static String formatDuration(long millis, String format) {
|
||||
long durationMillis = millis;
|
||||
List<Token> tokens = lexx(format);
|
||||
int days = 0;
|
||||
int hours = 0;
|
||||
int minutes = 0;
|
||||
int seconds = 0;
|
||||
int milliseconds = 0;
|
||||
|
||||
if (Token.containsTokenWithValue(tokens, DAY)) {
|
||||
days = (int) (durationMillis / MILLIS_PER_DAY);
|
||||
durationMillis -= days * MILLIS_PER_DAY;
|
||||
}
|
||||
if (Token.containsTokenWithValue(tokens, HOUR)) {
|
||||
hours = (int) (durationMillis / MILLIS_PER_HOUR);
|
||||
durationMillis -= hours * MILLIS_PER_HOUR;
|
||||
}
|
||||
if (Token.containsTokenWithValue(tokens, MINUTE)) {
|
||||
minutes = (int) (durationMillis / MILLIS_PER_MINUTE);
|
||||
durationMillis -= minutes * MILLIS_PER_MINUTE;
|
||||
}
|
||||
if (Token.containsTokenWithValue(tokens, SECOND)) {
|
||||
seconds = (int) (durationMillis / MILLIS_PER_SECOND);
|
||||
durationMillis -= seconds * MILLIS_PER_SECOND;
|
||||
}
|
||||
if (Token.containsTokenWithValue(tokens, MILLISECOND)) {
|
||||
milliseconds = (int) durationMillis;
|
||||
}
|
||||
return format(tokens, days, hours, minutes, seconds, milliseconds);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The internal method to do the formatting.</p>
|
||||
*
|
||||
* @param tokens the tokens
|
||||
* @param days the number of days
|
||||
* @param hours the number of hours
|
||||
* @param minutes the number of minutes
|
||||
* @param seconds the number of seconds
|
||||
* @param millis the number of millis
|
||||
* @return the formatted string
|
||||
*/
|
||||
private static String format(List<Token> tokens,
|
||||
int days, int hours, int minutes, int seconds, int millis) {
|
||||
int milliseconds = millis;
|
||||
StringBuilder buffer = new StringBuilder();
|
||||
boolean lastOutputSeconds = false;
|
||||
for (Token token : tokens) {
|
||||
Object value = token.getValue();
|
||||
if (value instanceof StringBuilder) {
|
||||
buffer.append(value);
|
||||
} else {
|
||||
if (DAY.equals(value)) {
|
||||
buffer.append(days);
|
||||
lastOutputSeconds = false;
|
||||
} else if (HOUR.equals(value)) {
|
||||
buffer.append(hours);
|
||||
lastOutputSeconds = false;
|
||||
} else if (MINUTE.equals(value)) {
|
||||
buffer.append(minutes);
|
||||
lastOutputSeconds = false;
|
||||
} else if (SECOND.equals(value)) {
|
||||
buffer.append(seconds);
|
||||
lastOutputSeconds = true;
|
||||
} else if (MILLISECOND.equals(value)) {
|
||||
if (lastOutputSeconds) {
|
||||
milliseconds += 1000;
|
||||
String str = Integer.toString(milliseconds);
|
||||
buffer.append(str.substring(1));
|
||||
} else {
|
||||
buffer.append(milliseconds);
|
||||
}
|
||||
lastOutputSeconds = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a classic date format string into Tokens.
|
||||
*
|
||||
* @param format to parse
|
||||
* @return array of Token
|
||||
*/
|
||||
private static List<Token> lexx(String format) {
|
||||
char[] array = format.toCharArray();
|
||||
List<Token> list = new ArrayList<>(array.length);
|
||||
boolean inLiteral = false;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Token previous = null;
|
||||
for (char ch : array) {
|
||||
if (inLiteral && ch != '\'') {
|
||||
sb.append(ch);
|
||||
continue;
|
||||
}
|
||||
Object value = null;
|
||||
switch (ch) {
|
||||
case '\'':
|
||||
if (inLiteral) {
|
||||
sb = new StringBuilder();
|
||||
inLiteral = false;
|
||||
} else {
|
||||
sb = new StringBuilder();
|
||||
list.add(new Token(sb));
|
||||
inLiteral = true;
|
||||
}
|
||||
break;
|
||||
case 'y':
|
||||
value = YEAR;
|
||||
break;
|
||||
case 'M':
|
||||
value = MONTH;
|
||||
break;
|
||||
case 'd':
|
||||
value = DAY;
|
||||
break;
|
||||
case 'H':
|
||||
value = HOUR;
|
||||
break;
|
||||
case 'm':
|
||||
value = MINUTE;
|
||||
break;
|
||||
case 's':
|
||||
value = SECOND;
|
||||
break;
|
||||
case 'S':
|
||||
value = MILLISECOND;
|
||||
break;
|
||||
default:
|
||||
if (sb.length() == 0) {
|
||||
sb = new StringBuilder();
|
||||
list.add(new Token(sb));
|
||||
}
|
||||
sb.append(ch);
|
||||
}
|
||||
if (value != null) {
|
||||
if (previous != null && value.equals(previous.getValue())) {
|
||||
previous.increment();
|
||||
} else {
|
||||
Token token = new Token(value);
|
||||
list.add(token);
|
||||
previous = token;
|
||||
}
|
||||
sb.setLength(0);
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
private static String replaceOnce(String text, String searchString, String replacement) {
|
||||
return replace(text, searchString, replacement, 1);
|
||||
}
|
||||
|
||||
private static String replace(String text, String searchString, String replacement, int maxvalue) {
|
||||
int max = maxvalue;
|
||||
if (isNullOrEmpty(text) || isNullOrEmpty(searchString) || replacement == null || max == 0) {
|
||||
return text;
|
||||
}
|
||||
int start = 0;
|
||||
int end = text.indexOf(searchString, start);
|
||||
if (end == -1) {
|
||||
return text;
|
||||
}
|
||||
int replLength = searchString.length();
|
||||
int increase = replacement.length() - replLength;
|
||||
increase = Math.max(increase, 0);
|
||||
increase *= (max < 0 ? 16 : (Math.min(max, 64)));
|
||||
StringBuilder buf = new StringBuilder(text.length() + increase);
|
||||
while (end != -1) {
|
||||
buf.append(text, start, end).append(replacement);
|
||||
start = end + replLength;
|
||||
if (--max == 0) {
|
||||
break;
|
||||
}
|
||||
end = text.indexOf(searchString, start);
|
||||
}
|
||||
buf.append(text.substring(start));
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
private static boolean isNullOrEmpty(String target) {
|
||||
return target == null || EMPTY.equals(target);
|
||||
}
|
||||
|
||||
/**
|
||||
* Element that is parsed from the format pattern.
|
||||
*/
|
||||
private static class Token {
|
||||
|
||||
private final Object value;
|
||||
|
||||
private int count;
|
||||
|
||||
Token(Object value) {
|
||||
this.value = value;
|
||||
this.count = 1;
|
||||
}
|
||||
|
||||
static boolean containsTokenWithValue(List<Token> tokens, Object value) {
|
||||
for (Token token : tokens) {
|
||||
if (token.getValue().equals(value)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
void increment() {
|
||||
count++;
|
||||
}
|
||||
|
||||
Object getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof Token) {
|
||||
Token tok = (Token) obj;
|
||||
if (this.value.getClass() != tok.value.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (this.count != tok.count) {
|
||||
return false;
|
||||
}
|
||||
if (this.value instanceof StringBuilder) {
|
||||
return this.value.toString().equals(tok.value.toString());
|
||||
} else if (this.value instanceof Number) {
|
||||
return this.value.equals(tok.value);
|
||||
} else {
|
||||
return this.value == tok.value;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return this.value.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return value + " (" + count + ")";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package org.xbib.elx.common;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
public class LongRingBuffer {
|
||||
|
||||
private final Long[] values1, values2;
|
||||
|
||||
private final int limit;
|
||||
|
||||
private final AtomicInteger index;
|
||||
|
||||
public LongRingBuffer(int limit) {
|
||||
this.values1 = new Long[limit];
|
||||
this.values2 = new Long[limit];
|
||||
Arrays.fill(values1, -1L);
|
||||
Arrays.fill(values2, -1L);
|
||||
this.limit = limit;
|
||||
this.index = new AtomicInteger();
|
||||
}
|
||||
|
||||
public int add(Long v1, Long v2) {
|
||||
int i = index.incrementAndGet() % limit;
|
||||
values1[i] = v1;
|
||||
values2[i] = v2;
|
||||
return i;
|
||||
}
|
||||
|
||||
public LongStream longStreamValues1() {
|
||||
return Arrays.stream(values1).filter(v -> v != -1L).mapToLong(Long::longValue);
|
||||
}
|
||||
|
||||
public LongStream longStreamValues2() {
|
||||
return Arrays.stream(values2).filter(v -> v != -1L).mapToLong(Long::longValue);
|
||||
}
|
||||
}
|
|
@ -34,7 +34,6 @@ public class MockAdminClient extends AbstractAdminClient {
|
|||
|
||||
@Override
|
||||
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -4,28 +4,33 @@ public enum Parameters {
|
|||
|
||||
DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"),
|
||||
|
||||
MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"),
|
||||
BULK_MAX_WAIT_RESPONSE("bulk.max_wait_response", String.class, "30s"),
|
||||
|
||||
MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30),
|
||||
BULK_START_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, -1),
|
||||
|
||||
START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0),
|
||||
BULK_STOP_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),
|
||||
|
||||
STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),
|
||||
BULK_LOGGING_ENABLED("bulk.logging.enabled", Boolean.class, true),
|
||||
|
||||
ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true),
|
||||
BULK_FAIL_ON_ERROR("bulk.fail_on_error", Boolean.class, true),
|
||||
|
||||
FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true),
|
||||
BULK_MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, -1),
|
||||
|
||||
MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000),
|
||||
BULK_MIN_VOLUME_PER_REQUEST("bulk.min_volume_per_request", String.class, "1k"),
|
||||
|
||||
RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 64),
|
||||
BULK_MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1m"),
|
||||
|
||||
// 0 = 1 CPU, synchronous requests, > 0 = n + 1 CPUs, asynchronous requests
|
||||
MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1),
|
||||
BULK_FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"),
|
||||
|
||||
MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1mb"),
|
||||
BULK_MEASURE_INTERVAL("bulk.measure_interval", String.class, "1s"),
|
||||
|
||||
FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s");
|
||||
BULK_METRIC_LOG_INTERVAL("bulk.metric_log_interval", String.class, "10s"),
|
||||
|
||||
BULK_RING_BUFFER_SIZE("bulk.ring_buffer_size", Integer.class, Runtime.getRuntime().availableProcessors()),
|
||||
|
||||
BULK_PERMITS("bulk.permits", Integer.class, Runtime.getRuntime().availableProcessors() - 1),
|
||||
|
||||
SEARCH_METRIC_LOG_INTERVAL("search.metric_log_interval", String.class, "10s");
|
||||
|
||||
private final String name;
|
||||
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import org.apache.tools.ant.taskdefs.condition.Os
|
||||
|
||||
dependencies{
|
||||
api project(':elx-common')
|
||||
api "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
|
||||
api "org.xbib:netty-http-client:${project.property('xbib-netty-http.version')}"
|
||||
api "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
|
||||
runtimeOnly "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
|
||||
runtimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ public class HttpAdminClient extends AbstractAdminClient implements Elasticsearc
|
|||
private final HttpClientHelper helper;
|
||||
|
||||
public HttpAdminClient() {
|
||||
super();
|
||||
this.helper = new HttpClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ public class HttpBulkClient extends AbstractBulkClient implements ElasticsearchC
|
|||
private final HttpClientHelper helper;
|
||||
|
||||
public HttpBulkClient() {
|
||||
super();
|
||||
this.helper = new HttpClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ public class HttpSearchClient extends AbstractSearchClient implements Elasticsea
|
|||
private final HttpClientHelper helper;
|
||||
|
||||
public HttpSearchClient() {
|
||||
super();
|
||||
this.helper = new HttpClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -27,8 +27,6 @@ class BulkClientTest {
|
|||
|
||||
private static final Long ACTIONS = 100000L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
BulkClientTest(TestExtension.Helper helper) {
|
||||
|
@ -40,7 +38,6 @@ class BulkClientTest {
|
|||
try (HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), TimeValue.timeValueSeconds(5))
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -52,19 +49,17 @@ class BulkClientTest {
|
|||
try (HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "30s")
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,8 +69,6 @@ class BulkClientTest {
|
|||
try (HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.name(), "60s")
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -84,11 +77,11 @@ class BulkClientTest {
|
|||
bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}");
|
||||
}
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
}
|
||||
|
@ -97,18 +90,13 @@ class BulkClientTest {
|
|||
@Test
|
||||
void testThreadedRandomDocs() throws Exception {
|
||||
int maxthreads = Runtime.getRuntime().availableProcessors();
|
||||
long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
||||
final long actions = ACTIONS;
|
||||
long timeout = 120L;
|
||||
try (HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
indexDefinition.setStartBulkRefreshSeconds(0); // disable refresh
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.startBulk(indexDefinition);
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(maxthreads);
|
||||
|
@ -132,11 +120,11 @@ class BulkClientTest {
|
|||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.http.HttpBulkClient;
|
||||
import org.xbib.elx.http.HttpBulkClientProvider;
|
||||
|
||||
|
@ -24,8 +23,6 @@ class DuplicateIDTest {
|
|||
|
||||
private static final Long ACTIONS = 100L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 5L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
DuplicateIDTest(TestExtension.Helper helper) {
|
||||
|
@ -38,7 +35,6 @@ class DuplicateIDTest {
|
|||
try (HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -50,11 +46,11 @@ class DuplicateIDTest {
|
|||
bulkClient.refreshIndex(indexDefinition);
|
||||
long hits = bulkClient.getSearchableDocs(indexDefinition);
|
||||
assertTrue(hits < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,10 +85,10 @@ class IndexPruneTest {
|
|||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import org.apache.logging.log4j.Level;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
@ -23,7 +22,6 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
@ -109,10 +107,10 @@ class IndexShiftTest {
|
|||
assertTrue(aliases.containsKey("d"));
|
||||
assertTrue(aliases.containsKey("e"));
|
||||
assertTrue(aliases.containsKey("f"));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.http.HttpBulkClient;
|
||||
import org.xbib.elx.http.HttpBulkClientProvider;
|
||||
import org.xbib.elx.http.HttpSearchClient;
|
||||
|
@ -28,8 +27,6 @@ class SearchTest {
|
|||
|
||||
private static final Long ACTIONS = 100000L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
|
@ -43,7 +40,6 @@ class SearchTest {
|
|||
try (HttpBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(HttpBulkClientProvider.class)
|
||||
.put(helper.getHttpSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
for (int i = 0; i < ACTIONS; i++) {
|
||||
|
@ -53,11 +49,11 @@ class SearchTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
try (HttpSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(HttpSearchClientProvider.class)
|
||||
|
|
|
@ -64,12 +64,12 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
|
|
|
@ -10,6 +10,7 @@ public class NodeAdminClient extends AbstractAdminClient {
|
|||
private final NodeClientHelper helper;
|
||||
|
||||
public NodeAdminClient() {
|
||||
super();
|
||||
this.helper = new NodeClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ public class NodeBulkClient extends AbstractBulkClient {
|
|||
private final NodeClientHelper helper;
|
||||
|
||||
public NodeBulkClient() {
|
||||
super();
|
||||
this.helper = new NodeClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ public class NodeSearchClient extends AbstractSearchClient {
|
|||
private final NodeClientHelper helper;
|
||||
|
||||
public NodeSearchClient() {
|
||||
super();
|
||||
this.helper = new NodeClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -2,12 +2,15 @@ package org.xbib.elx.node.test;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.node.NodeAdminClient;
|
||||
import org.xbib.elx.node.NodeAdminClientProvider;
|
||||
import org.xbib.elx.node.NodeBulkClient;
|
||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||
|
||||
|
@ -26,8 +29,6 @@ class BulkClientTest {
|
|||
|
||||
private static final Long ACTIONS = 100000L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
BulkClientTest(TestExtension.Helper helper) {
|
||||
|
@ -39,7 +40,6 @@ class BulkClientTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "5s")
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -51,18 +51,16 @@ class BulkClientTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "30s")
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,8 +70,6 @@ class BulkClientTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "60s")
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -84,11 +80,11 @@ class BulkClientTest {
|
|||
}
|
||||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
}
|
||||
|
@ -97,15 +93,11 @@ class BulkClientTest {
|
|||
@Test
|
||||
void testThreadedRandomDocs() throws Exception {
|
||||
int maxthreads = Runtime.getRuntime().availableProcessors();
|
||||
long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
|
||||
final long actions = ACTIONS;
|
||||
long timeout = 120L;
|
||||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
indexDefinition.setStartBulkRefreshSeconds(0);
|
||||
|
@ -132,11 +124,11 @@ class BulkClientTest {
|
|||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ class DuplicateIDTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -49,11 +49,11 @@ class DuplicateIDTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,10 +87,10 @@ class IndexPruneTest {
|
|||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,10 +107,10 @@ class IndexShiftTest {
|
|||
assertTrue(aliases.containsKey("d"));
|
||||
assertTrue(aliases.containsKey("e"));
|
||||
assertTrue(aliases.containsKey("f"));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ class SearchTest {
|
|||
try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client())
|
||||
.setBulkClientProvider(NodeBulkClientProvider.class)
|
||||
.put(helper.getNodeSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.startBulk(indexDefinition);
|
||||
|
@ -56,11 +56,11 @@ class SearchTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client())
|
||||
.setSearchClientProvider(NodeSearchClientProvider.class)
|
||||
|
|
|
@ -67,12 +67,12 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
|
|
|
@ -15,6 +15,7 @@ public class TransportAdminClient extends AbstractAdminClient {
|
|||
private final TransportClientHelper helper;
|
||||
|
||||
public TransportAdminClient() {
|
||||
super();
|
||||
this.helper = new TransportClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ public class TransportBulkClient extends AbstractBulkClient {
|
|||
private final TransportClientHelper helper;
|
||||
|
||||
public TransportBulkClient() {
|
||||
super();
|
||||
this.helper = new TransportClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.xbib.elx.transport;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -19,9 +18,6 @@ import org.elasticsearch.common.network.NetworkModule;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.transport.Netty4Plugin;
|
||||
|
||||
|
@ -43,26 +39,14 @@ public class TransportClientHelper {
|
|||
|
||||
private static final Map<String, ElasticsearchClient> clientMap = new HashMap<>();
|
||||
|
||||
protected ElasticsearchClient createClient(Settings settings) throws IOException {
|
||||
if (settings != null) {
|
||||
String systemIdentifier = System.getProperty("os.name")
|
||||
+ " " + System.getProperty("java.vm.name")
|
||||
+ " " + System.getProperty("java.vm.vendor")
|
||||
+ " " + System.getProperty("java.vm.version")
|
||||
+ " Elasticsearch " + Version.CURRENT.toString();
|
||||
Settings transportClientSettings = getTransportClientSettings(settings);
|
||||
XContentBuilder effectiveSettingsBuilder = XContentFactory.jsonBuilder().startObject();
|
||||
logger.log(Level.INFO, "creating transport client on {} with settings {}",
|
||||
systemIdentifier,
|
||||
Strings.toString(transportClientSettings.toXContent(effectiveSettingsBuilder,
|
||||
ToXContent.EMPTY_PARAMS).endObject()));
|
||||
return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class));
|
||||
}
|
||||
return null;
|
||||
public ElasticsearchClient createClient(Settings settings) {
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
return clientMap.computeIfAbsent(clusterName, key -> innerCreateClient(settings));
|
||||
}
|
||||
|
||||
public void closeClient(Settings settings) {
|
||||
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name"));
|
||||
String clusterName = settings.get("cluster.name", "elasticsearch");
|
||||
ElasticsearchClient client = clientMap.remove(clusterName);
|
||||
if (client != null) {
|
||||
if (client instanceof Client) {
|
||||
((Client) client).close();
|
||||
|
@ -74,7 +58,7 @@ public class TransportClientHelper {
|
|||
public void init(TransportClient transportClient, Settings settings) throws IOException {
|
||||
Collection<TransportAddress> addrs = findAddresses(settings);
|
||||
if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) {
|
||||
throw new NoNodeAvailableException("no cluster nodes available, check settings "
|
||||
throw new NoNodeAvailableException("no cluster nodes available, check settings = "
|
||||
+ Strings.toString(settings));
|
||||
}
|
||||
}
|
||||
|
@ -128,6 +112,18 @@ public class TransportClientHelper {
|
|||
return false;
|
||||
}
|
||||
|
||||
private ElasticsearchClient innerCreateClient(Settings settings) {
|
||||
String systemIdentifier = System.getProperty("os.name")
|
||||
+ " " + System.getProperty("java.vm.name")
|
||||
+ " " + System.getProperty("java.vm.vendor")
|
||||
+ " " + System.getProperty("java.vm.version")
|
||||
+ " Elasticsearch " + Version.CURRENT.toString();
|
||||
logger.info("creating transport client on {} with custom settings {}",
|
||||
systemIdentifier, Strings.toString(settings));
|
||||
Settings transportClientSettings = getTransportClientSettings(settings);
|
||||
return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class));
|
||||
}
|
||||
|
||||
private Settings getTransportClientSettings(Settings settings) {
|
||||
return Settings.builder()
|
||||
// "cluster.name"
|
||||
|
|
|
@ -14,6 +14,7 @@ public class TransportSearchClient extends AbstractSearchClient {
|
|||
private final TransportClientHelper helper;
|
||||
|
||||
public TransportSearchClient() {
|
||||
super();
|
||||
this.helper = new TransportClientHelper();
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||
|
||||
|
@ -26,8 +25,6 @@ class BulkClientTest {
|
|||
|
||||
private static final Long ACTIONS = 100000L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
BulkClientTest(TestExtension.Helper helper) {
|
||||
|
@ -35,38 +32,32 @@ class BulkClientTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void testSingleDoc() throws Exception {
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
void testNewIndex() throws Exception {
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "30s")
|
||||
.build();
|
||||
try {
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
} finally {
|
||||
assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
bulkClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testNewIndex() throws Exception {
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
void testSingleDoc() throws Exception {
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "5s")
|
||||
.build();
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.close();
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -75,8 +66,6 @@ class BulkClientTest {
|
|||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "60s")
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -87,11 +76,11 @@ class BulkClientTest {
|
|||
}
|
||||
bulkClient.flush();
|
||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
}
|
||||
|
@ -105,9 +94,6 @@ class BulkClientTest {
|
|||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
|
@ -133,11 +119,11 @@ class BulkClientTest {
|
|||
bulkClient.stopBulk(indexDefinition);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ class DuplicateIDTest {
|
|||
long numactions = ACTIONS;
|
||||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc");
|
||||
|
@ -49,11 +49,11 @@ class DuplicateIDTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS);
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,10 +85,10 @@ class IndexPruneTest {
|
|||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,11 +37,11 @@ class IndexShiftTest {
|
|||
|
||||
@Test
|
||||
void testIndexShift() throws Exception {
|
||||
try (final TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
try (TransportAdminClient adminClient = ClientBuilder.builder()
|
||||
.setAdminClientProvider(TransportAdminClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build();
|
||||
final TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.build()) {
|
||||
|
@ -104,10 +104,10 @@ class IndexShiftTest {
|
|||
assertTrue(aliases.containsKey("d"));
|
||||
assertTrue(aliases.containsKey("e"));
|
||||
assertTrue(aliases.containsKey("f"));
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.xbib.elx.api.IndexDefinition;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.common.DefaultIndexDefinition;
|
||||
import org.xbib.elx.common.Parameters;
|
||||
import org.xbib.elx.transport.TransportBulkClient;
|
||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||
import org.xbib.elx.transport.TransportSearchClient;
|
||||
|
@ -29,8 +28,6 @@ class SearchTest {
|
|||
|
||||
private static final Long ACTIONS = 100000L;
|
||||
|
||||
private static final Long MAX_ACTIONS_PER_REQUEST = 100L;
|
||||
|
||||
private final TestExtension.Helper helper;
|
||||
|
||||
SearchTest(TestExtension.Helper helper) {
|
||||
|
@ -44,7 +41,6 @@ class SearchTest {
|
|||
try (TransportBulkClient bulkClient = ClientBuilder.builder()
|
||||
.setBulkClientProvider(TransportBulkClientProvider.class)
|
||||
.put(helper.getTransportSettings())
|
||||
.put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST)
|
||||
.build()) {
|
||||
bulkClient.newIndex(indexDefinition);
|
||||
bulkClient.startBulk(indexDefinition);
|
||||
|
@ -55,11 +51,11 @@ class SearchTest {
|
|||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
bulkClient.refreshIndex(indexDefinition);
|
||||
assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition));
|
||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
try (TransportSearchClient searchClient = ClientBuilder.builder()
|
||||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||
|
|
|
@ -67,12 +67,12 @@ class SmokeTest {
|
|||
adminClient.updateReplicaLevel(indexDefinition, 2);
|
||||
int replica = adminClient.getReplicaLevel(indexDefinition);
|
||||
assertEquals(2, replica);
|
||||
assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||
assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount());
|
||||
assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount());
|
||||
if (bulkClient.getBulkProcessor().getLastBulkError() != null) {
|
||||
logger.error("error", bulkClient.getBulkProcessor().getLastBulkError());
|
||||
}
|
||||
assertNull(bulkClient.getBulkController().getLastBulkError());
|
||||
assertNull(bulkClient.getBulkProcessor().getLastBulkError());
|
||||
adminClient.deleteIndex(indexDefinition);
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
group = org.xbib
|
||||
name = elx
|
||||
version = 7.10.2.0
|
||||
version = 7.10.2.1
|
||||
|
||||
gradle.wrapper.version = 6.6.1
|
||||
xbib-metrics.version = 2.1.0
|
||||
xbib-time.version = 2.1.0
|
||||
xbib-netty-http.version = 4.1.63.0
|
||||
elasticsearch.version = 7.10.2
|
||||
# ES 7.10.2 uses Jackson 2.10.4
|
||||
|
|
Loading…
Reference in a new issue