cleaning up
This commit is contained in:
parent
a21e0aef5e
commit
6146da9554
17 changed files with 121 additions and 106 deletions
|
@ -24,8 +24,6 @@ public interface AdminClient extends BasicClient {
|
||||||
|
|
||||||
Map<String, ?> getMapping(String index) throws IOException;
|
Map<String, ?> getMapping(String index) throws IOException;
|
||||||
|
|
||||||
Map<String, ?> getMapping(String index, String type) throws IOException;
|
|
||||||
|
|
||||||
void checkMapping(String index);
|
void checkMapping(String index);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -165,7 +165,6 @@ public interface BulkClient extends BasicClient, Flushable {
|
||||||
void startBulk(String index, long startRefreshIntervalSeconds,
|
void startBulk(String index, long startRefreshIntervalSeconds,
|
||||||
long stopRefreshIntervalSeconds) throws IOException;
|
long stopRefreshIntervalSeconds) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop bulk mode.
|
* Stop bulk mode.
|
||||||
*
|
*
|
||||||
|
|
|
@ -4,7 +4,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A bulk listener for the execution.
|
* A bulk listener for following executions of bulk operations.
|
||||||
*/
|
*/
|
||||||
public interface BulkListener {
|
public interface BulkListener {
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package org.xbib.elx.api;
|
package org.xbib.elx.api;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.Flushable;
|
import java.io.Flushable;
|
||||||
|
@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface BulkProcessor extends Closeable, Flushable {
|
public interface BulkProcessor extends Closeable, Flushable {
|
||||||
|
|
||||||
BulkProcessor add(ActionRequest request);
|
BulkProcessor add(DocWriteRequest<?> request);
|
||||||
|
|
||||||
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
|
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,12 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface IndexDefinition {
|
public interface IndexDefinition {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The one and only index type name used in the extended client.
|
||||||
|
* Note that all Elasticsearch version < 6.2.0 do not allow a prepending "_".
|
||||||
|
*/
|
||||||
|
String TYPE_NAME = "_doc";
|
||||||
|
|
||||||
IndexDefinition setIndex(String index);
|
IndexDefinition setIndex(String index);
|
||||||
|
|
||||||
String getIndex();
|
String getIndex();
|
||||||
|
|
|
@ -87,16 +87,12 @@ import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.xbib.elx.api.IndexDefinition.TYPE_NAME;
|
||||||
|
|
||||||
public abstract class AbstractAdminClient extends AbstractBasicClient implements AdminClient {
|
public abstract class AbstractAdminClient extends AbstractBasicClient implements AdminClient {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName());
|
private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName());
|
||||||
|
|
||||||
/**
|
|
||||||
* The one and only index type name used in the extended client.
|
|
||||||
* Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_".
|
|
||||||
*/
|
|
||||||
private static final String TYPE_NAME = "doc";
|
|
||||||
|
|
||||||
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
|
private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() {
|
||||||
@Override
|
@Override
|
||||||
public List<String> getMovedAliases() {
|
public List<String> getMovedAliases() {
|
||||||
|
@ -133,17 +129,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, ?> getMapping(String index) {
|
public Map<String, ?> getMapping(String index) {
|
||||||
return getMapping(index, TYPE_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, ?> getMapping(String index, String mapping) {
|
|
||||||
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
|
||||||
.setIndices(index)
|
.setIndices(index)
|
||||||
.setTypes(mapping);
|
.setTypes(TYPE_NAME);
|
||||||
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
|
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
|
||||||
logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap());
|
return getMappingsResponse.getMappings().get(index).get(TYPE_NAME).getSourceAsMap();
|
||||||
return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -377,7 +367,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
|
||||||
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
|
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE);
|
||||||
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
|
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
|
||||||
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
|
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
|
||||||
logger.info("{} indices", getIndexResponse.getIndices().length);
|
logger.info("found {} indices for pruning", getIndexResponse.getIndices().length);
|
||||||
List<String> candidateIndices = new ArrayList<>();
|
List<String> candidateIndices = new ArrayList<>();
|
||||||
for (String s : getIndexResponse.getIndices()) {
|
for (String s : getIndexResponse.getIndices()) {
|
||||||
Matcher m = pattern.matcher(s);
|
Matcher m = pattern.matcher(s);
|
||||||
|
|
|
@ -55,7 +55,7 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
@Override
|
@Override
|
||||||
public void init(Settings settings) throws IOException {
|
public void init(Settings settings) throws IOException {
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (closed.compareAndSet(false, true)) {
|
||||||
logger.log(Level.DEBUG, "initializing with settings = " + settings.toDelimitedString(','));
|
logger.log(Level.INFO, "initializing client with settings = " + settings.toDelimitedString(','));
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
setClient(createClient(settings));
|
setClient(createClient(settings));
|
||||||
} else {
|
} else {
|
||||||
|
@ -102,6 +102,7 @@ public abstract class AbstractBasicClient implements BasicClient {
|
||||||
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||||
|
logger.log(Level.DEBUG, "waiting " + timeout + " for shard settling down");
|
||||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||||
.waitForNoInitializingShards(true)
|
.waitForNoInitializingShards(true)
|
||||||
.waitForNoRelocatingShards(true)
|
.waitForNoRelocatingShards(true)
|
||||||
|
|
|
@ -4,7 +4,7 @@ import org.apache.logging.log4j.Level;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushAction;
|
import org.elasticsearch.action.admin.indices.flush.FlushAction;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
|
@ -31,6 +31,8 @@ import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.xbib.elx.api.IndexDefinition.TYPE_NAME;
|
||||||
|
|
||||||
public abstract class AbstractBulkClient extends AbstractBasicClient implements BulkClient {
|
public abstract class AbstractBulkClient extends AbstractBasicClient implements BulkClient {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName());
|
private static final Logger logger = LogManager.getLogger(AbstractBulkClient.class.getName());
|
||||||
|
@ -43,7 +45,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
public void init(Settings settings) throws IOException {
|
public void init(Settings settings) throws IOException {
|
||||||
if (closed.compareAndSet(true, false)) {
|
if (closed.compareAndSet(true, false)) {
|
||||||
super.init(settings);
|
super.init(settings);
|
||||||
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
|
logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(','));
|
||||||
bulkController = new DefaultBulkController(this);
|
bulkController = new DefaultBulkController(this);
|
||||||
bulkController.init(settings);
|
bulkController.init(settings);
|
||||||
} else {
|
} else {
|
||||||
|
@ -96,32 +98,43 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
|
public void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
|
||||||
String mappingString = Strings.toString(builder);
|
if (mapping == null || mapping.isEmpty()) {
|
||||||
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
newIndex(index, settings, (XContentBuilder) null);
|
||||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingString).mapOrdered();
|
} else {
|
||||||
newIndex(index, settings, mappings);
|
newIndex(index, settings, JsonXContent.contentBuilder().map(mapping));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
|
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
|
||||||
if (index == null) {
|
if (index == null) {
|
||||||
logger.warn("no index name given to create index");
|
logger.warn("unable to create index, no index name given");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ensureClientIsPresent();
|
ensureClientIsPresent();
|
||||||
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
waitForCluster("YELLOW", 30L, TimeUnit.SECONDS);
|
||||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index);
|
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
|
||||||
|
.setIndex(index);
|
||||||
if (settings != null) {
|
if (settings != null) {
|
||||||
createIndexRequest.settings(settings);
|
createIndexRequestBuilder.setSettings(settings);
|
||||||
}
|
}
|
||||||
if (mapping != null) {
|
if (builder != null) {
|
||||||
createIndexRequest.mapping("_doc", mapping);
|
createIndexRequestBuilder.addMapping(TYPE_NAME, builder);
|
||||||
}
|
logger.debug("adding mapping = {}", Strings.toString(builder));
|
||||||
CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet();
|
} else {
|
||||||
|
createIndexRequestBuilder.addMapping(TYPE_NAME,
|
||||||
|
JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject());
|
||||||
|
logger.debug("empty mapping");
|
||||||
|
}
|
||||||
|
|
||||||
|
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
|
||||||
if (createIndexResponse.isAcknowledged()) {
|
if (createIndexResponse.isAcknowledged()) {
|
||||||
logger.info("index {} created", index);
|
logger.info("index {} created", index);
|
||||||
|
} else {
|
||||||
|
logger.warn("index creation of {} not acknowledged", index);
|
||||||
}
|
}
|
||||||
|
refreshIndex(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -156,7 +169,8 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkClient index(String index, String id, boolean create, String source) {
|
public BulkClient index(String index, String id, boolean create, String source) {
|
||||||
return index(index, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
|
return index(new IndexRequest().index(index).id(id).create(create)
|
||||||
|
.source(new BytesArray(source.getBytes(StandardCharsets.UTF_8)), XContentType.JSON));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -167,8 +181,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkClient index(IndexRequest indexRequest) {
|
public BulkClient index(IndexRequest indexRequest) {
|
||||||
ensureClientIsPresent();
|
if (bulkController != null) {
|
||||||
bulkController.bulkIndex(indexRequest);
|
ensureClientIsPresent();
|
||||||
|
bulkController.bulkIndex(indexRequest);
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,8 +195,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkClient delete(DeleteRequest deleteRequest) {
|
public BulkClient delete(DeleteRequest deleteRequest) {
|
||||||
ensureClientIsPresent();
|
if (bulkController != null) {
|
||||||
bulkController.bulkDelete(deleteRequest);
|
ensureClientIsPresent();
|
||||||
|
bulkController.bulkDelete(deleteRequest);
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -132,8 +132,7 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
searchMetric.getCurrentQueries().dec();
|
searchMetric.getCurrentQueries().dec();
|
||||||
searchMetric.getQueries().inc();
|
searchMetric.getQueries().inc();
|
||||||
searchMetric.markTotalQueries(1);
|
searchMetric.markTotalQueries(1);
|
||||||
boolean isempty = originalSearchResponse.getHits().getTotalHits().value == 0;
|
if (originalSearchResponse.getHits().getTotalHits().value == 0) {
|
||||||
if (isempty) {
|
|
||||||
searchMetric.getEmptyQueries().inc();
|
searchMetric.getEmptyQueries().inc();
|
||||||
} else {
|
} else {
|
||||||
searchMetric.getSucceededQueries().inc();
|
searchMetric.getSucceededQueries().inc();
|
||||||
|
@ -150,6 +149,11 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement
|
||||||
searchMetric.getCurrentQueries().dec();
|
searchMetric.getCurrentQueries().dec();
|
||||||
searchMetric.getQueries().inc();
|
searchMetric.getQueries().inc();
|
||||||
searchMetric.markTotalQueries(1);
|
searchMetric.markTotalQueries(1);
|
||||||
|
if ( searchResponse1.getHits().getHits().length == 0) {
|
||||||
|
searchMetric.getEmptyQueries().inc();
|
||||||
|
} else {
|
||||||
|
searchMetric.getSucceededQueries().inc();
|
||||||
|
}
|
||||||
return searchResponse1;
|
return searchResponse1;
|
||||||
});
|
});
|
||||||
Predicate<SearchResponse> condition = searchResponse -> searchResponse.getHits().getHits().length > 0;
|
Predicate<SearchResponse> condition = searchResponse -> searchResponse.getHits().getHits().length > 0;
|
||||||
|
|
|
@ -23,6 +23,8 @@ import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.xbib.elx.api.IndexDefinition.TYPE_NAME;
|
||||||
|
|
||||||
public class DefaultBulkController implements BulkController {
|
public class DefaultBulkController implements BulkController {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(DefaultBulkController.class);
|
private static final Logger logger = LogManager.getLogger(DefaultBulkController.class);
|
||||||
|
@ -126,8 +128,8 @@ public class DefaultBulkController implements BulkController {
|
||||||
public void bulkIndex(IndexRequest indexRequest) {
|
public void bulkIndex(IndexRequest indexRequest) {
|
||||||
ensureActiveAndBulk();
|
ensureActiveAndBulk();
|
||||||
try {
|
try {
|
||||||
bulkMetric.getCurrentIngest().inc(indexRequest.index(), "_doc", indexRequest.id());
|
|
||||||
bulkProcessor.add(indexRequest);
|
bulkProcessor.add(indexRequest);
|
||||||
|
bulkMetric.getCurrentIngest().inc(indexRequest.index(), TYPE_NAME, indexRequest.id());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
logger.error("bulk add of index failed: " + e.getMessage(), e);
|
logger.error("bulk add of index failed: " + e.getMessage(), e);
|
||||||
|
@ -140,8 +142,8 @@ public class DefaultBulkController implements BulkController {
|
||||||
public void bulkDelete(DeleteRequest deleteRequest) {
|
public void bulkDelete(DeleteRequest deleteRequest) {
|
||||||
ensureActiveAndBulk();
|
ensureActiveAndBulk();
|
||||||
try {
|
try {
|
||||||
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), "_doc", deleteRequest.id());
|
|
||||||
bulkProcessor.add(deleteRequest);
|
bulkProcessor.add(deleteRequest);
|
||||||
|
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), TYPE_NAME, deleteRequest.id());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
logger.error("bulk add of delete failed: " + e.getMessage(), e);
|
logger.error("bulk add of delete failed: " + e.getMessage(), e);
|
||||||
|
@ -154,8 +156,8 @@ public class DefaultBulkController implements BulkController {
|
||||||
public void bulkUpdate(UpdateRequest updateRequest) {
|
public void bulkUpdate(UpdateRequest updateRequest) {
|
||||||
ensureActiveAndBulk();
|
ensureActiveAndBulk();
|
||||||
try {
|
try {
|
||||||
bulkMetric.getCurrentIngest().inc(updateRequest.index(), "_doc", updateRequest.id());
|
|
||||||
bulkProcessor.add(updateRequest);
|
bulkProcessor.add(updateRequest);
|
||||||
|
bulkMetric.getCurrentIngest().inc(updateRequest.index(), TYPE_NAME, updateRequest.id());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
logger.error("bulk add of update failed: " + e.getMessage(), e);
|
logger.error("bulk add of update failed: " + e.getMessage(), e);
|
||||||
|
|
|
@ -1,13 +1,10 @@
|
||||||
package org.xbib.elx.common;
|
package org.xbib.elx.common;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkAction;
|
import org.elasticsearch.action.bulk.BulkAction;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
|
||||||
import org.elasticsearch.client.ElasticsearchClient;
|
import org.elasticsearch.client.ElasticsearchClient;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
@ -83,6 +80,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BulkListener getBulkListener() {
|
||||||
|
return bulkListener;
|
||||||
|
}
|
||||||
|
|
||||||
public static Builder builder(ElasticsearchClient client,
|
public static Builder builder(ElasticsearchClient client,
|
||||||
BulkListener bulkListener) {
|
BulkListener bulkListener) {
|
||||||
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
|
Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null");
|
||||||
|
@ -143,11 +145,6 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
return bulkRequestHandler.close(timeout, unit);
|
return bulkRequestHandler.close(timeout, unit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public BulkListener getBulkListener() {
|
|
||||||
return bulkListener;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds either a delete or an index request.
|
* Adds either a delete or an index request.
|
||||||
*
|
*
|
||||||
|
@ -155,8 +152,14 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
* @return his bulk processor
|
* @return his bulk processor
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public DefaultBulkProcessor add(ActionRequest request) {
|
public synchronized DefaultBulkProcessor add(DocWriteRequest<?> request) {
|
||||||
internalAdd(request);
|
ensureOpen();
|
||||||
|
bulkRequest.add(request);
|
||||||
|
if (bulkActions != -1 &&
|
||||||
|
bulkRequest.numberOfActions() >= bulkActions ||
|
||||||
|
bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
|
||||||
|
execute();
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,41 +193,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void internalAdd(ActionRequest request) {
|
|
||||||
ensureOpen();
|
|
||||||
if (request instanceof IndexRequest) {
|
|
||||||
bulkRequest.add((IndexRequest) request);
|
|
||||||
} else if (request instanceof DeleteRequest) {
|
|
||||||
bulkRequest.add((DeleteRequest) request);
|
|
||||||
} else if (request instanceof UpdateRequest) {
|
|
||||||
bulkRequest.add((UpdateRequest) request);
|
|
||||||
} else {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
executeIfNeeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeIfNeeded() {
|
|
||||||
ensureOpen();
|
|
||||||
if (!isOverTheLimit()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void execute() {
|
private void execute() {
|
||||||
final BulkRequest myBulkRequest = this.bulkRequest;
|
BulkRequest myBulkRequest = this.bulkRequest;
|
||||||
final long executionId = executionIdGen.incrementAndGet();
|
long executionId = executionIdGen.incrementAndGet();
|
||||||
this.bulkRequest = new BulkRequest();
|
this.bulkRequest = new BulkRequest();
|
||||||
this.bulkRequestHandler.execute(myBulkRequest, executionId);
|
this.bulkRequestHandler.execute(myBulkRequest, executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isOverTheLimit() {
|
|
||||||
return bulkActions != -1 &&
|
|
||||||
bulkRequest.numberOfActions() >= bulkActions ||
|
|
||||||
bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A builder used to create a build an instance of a bulk processor.
|
* A builder used to create a build an instance of a bulk processor.
|
||||||
*/
|
*/
|
||||||
|
@ -393,7 +368,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(final BulkRequest bulkRequest, final long executionId) {
|
public void execute(BulkRequest bulkRequest, long executionId) {
|
||||||
boolean bulkRequestSetupSuccessful = false;
|
boolean bulkRequestSetupSuccessful = false;
|
||||||
boolean acquired = false;
|
boolean acquired = false;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -92,7 +92,7 @@ class BulkClientTest {
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject();
|
.endObject();
|
||||||
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
||||||
assertTrue(adminClient.getMapping("test", "_doc").containsKey("properties"));
|
assertTrue(adminClient.getMapping("test").containsKey("properties"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,8 @@ class SearchTest {
|
||||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||||
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}");
|
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}");
|
||||||
bulkClient.flush();
|
bulkClient.flush();
|
||||||
|
bulkClient.refreshIndex("test");
|
||||||
|
assertEquals(numactions + 1, bulkClient.getSearchableDocs("test"));
|
||||||
}
|
}
|
||||||
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||||
|
@ -84,8 +86,8 @@ class SearchTest {
|
||||||
});
|
});
|
||||||
assertEquals(numactions + 1, idcount.get());
|
assertEquals(numactions + 1, idcount.get());
|
||||||
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount());
|
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount());
|
||||||
assertEquals(3, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||||
assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ class BulkClientTest {
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject();
|
.endObject();
|
||||||
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
||||||
assertTrue(adminClient.getMapping("test", "_doc").containsKey("properties"));
|
assertTrue(adminClient.getMapping("test").containsKey("properties"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
@ -15,6 +16,8 @@ import org.xbib.elx.node.NodeBulkClient;
|
||||||
import org.xbib.elx.node.NodeBulkClientProvider;
|
import org.xbib.elx.node.NodeBulkClientProvider;
|
||||||
import org.xbib.elx.node.NodeSearchClient;
|
import org.xbib.elx.node.NodeSearchClient;
|
||||||
import org.xbib.elx.node.NodeSearchClientProvider;
|
import org.xbib.elx.node.NodeSearchClientProvider;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
@ -52,8 +55,13 @@ class SearchTest {
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
bulkClient.refreshIndex("test");
|
bulkClient.refreshIndex("test");
|
||||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||||
|
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}");
|
||||||
|
bulkClient.flush();
|
||||||
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
|
bulkClient.refreshIndex("test");
|
||||||
|
assertEquals(numactions + 1, bulkClient.getSearchableDocs("test"));
|
||||||
}
|
}
|
||||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||||
}
|
}
|
||||||
|
@ -62,12 +70,14 @@ class SearchTest {
|
||||||
.setSearchClientProvider(NodeSearchClientProvider.class)
|
.setSearchClientProvider(NodeSearchClientProvider.class)
|
||||||
.put(helper.getNodeSettings("1"))
|
.put(helper.getNodeSettings("1"))
|
||||||
.build()) {
|
.build()) {
|
||||||
|
Optional<GetResponse> responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0"));
|
||||||
|
assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString());
|
||||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()),
|
.setQuery(QueryBuilders.matchAllQuery()),
|
||||||
TimeValue.timeValueMinutes(1), 10);
|
TimeValue.timeValueMinutes(1), 10);
|
||||||
long count = stream.count();
|
long count = stream.count();
|
||||||
assertEquals(numactions, count);
|
assertEquals(numactions + 1, count);
|
||||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()));
|
.setQuery(QueryBuilders.matchAllQuery()));
|
||||||
|
@ -76,10 +86,10 @@ class SearchTest {
|
||||||
logger.info(id);
|
logger.info(id);
|
||||||
idcount.incrementAndGet();
|
idcount.incrementAndGet();
|
||||||
});
|
});
|
||||||
assertEquals(numactions, idcount.get());
|
assertEquals(numactions + 1, idcount.get());
|
||||||
assertEquals(13, searchClient.getSearchMetric().getQueries().getCount());
|
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount());
|
||||||
assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||||
assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ class BulkClientTest {
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject();
|
.endObject();
|
||||||
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
bulkClient.newIndex("test", Settings.EMPTY, builder);
|
||||||
assertTrue(adminClient.getMapping("test", "_doc").containsKey("properties"));
|
assertTrue(adminClient.getMapping("test").containsKey("properties"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
@ -15,6 +16,8 @@ import org.xbib.elx.transport.TransportBulkClient;
|
||||||
import org.xbib.elx.transport.TransportBulkClientProvider;
|
import org.xbib.elx.transport.TransportBulkClientProvider;
|
||||||
import org.xbib.elx.transport.TransportSearchClient;
|
import org.xbib.elx.transport.TransportSearchClient;
|
||||||
import org.xbib.elx.transport.TransportSearchClientProvider;
|
import org.xbib.elx.transport.TransportSearchClientProvider;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
@ -52,8 +55,13 @@ class SearchTest {
|
||||||
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
bulkClient.refreshIndex("test");
|
bulkClient.refreshIndex("test");
|
||||||
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
assertEquals(numactions, bulkClient.getSearchableDocs("test"));
|
||||||
|
bulkClient.index("test", "0", false, "{\"name\":\"Hello\"}");
|
||||||
|
bulkClient.flush();
|
||||||
|
bulkClient.waitForResponses(30L, TimeUnit.SECONDS);
|
||||||
|
bulkClient.refreshIndex("test");
|
||||||
|
assertEquals(numactions + 1, bulkClient.getSearchableDocs("test"));
|
||||||
}
|
}
|
||||||
assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
assertEquals(numactions + 1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount());
|
||||||
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
if (bulkClient.getBulkController().getLastBulkError() != null) {
|
||||||
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
logger.error("error", bulkClient.getBulkController().getLastBulkError());
|
||||||
}
|
}
|
||||||
|
@ -62,12 +70,14 @@ class SearchTest {
|
||||||
.setSearchClientProvider(TransportSearchClientProvider.class)
|
.setSearchClientProvider(TransportSearchClientProvider.class)
|
||||||
.put(helper.getTransportSettings())
|
.put(helper.getTransportSettings())
|
||||||
.build()) {
|
.build()) {
|
||||||
|
Optional<GetResponse> responseOptional = searchClient.get(grb -> grb.setIndex("test").setId("0"));
|
||||||
|
assertEquals("{\"name\":\"Hello\"}", responseOptional.get().getSourceAsString());
|
||||||
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
Stream<SearchHit> stream = searchClient.search(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()),
|
.setQuery(QueryBuilders.matchAllQuery()),
|
||||||
TimeValue.timeValueMinutes(1), 10);
|
TimeValue.timeValueMinutes(1), 10);
|
||||||
long count = stream.count();
|
long count = stream.count();
|
||||||
assertEquals(numactions, count);
|
assertEquals(numactions + 1, count);
|
||||||
Stream<String> ids = searchClient.getIds(qb -> qb
|
Stream<String> ids = searchClient.getIds(qb -> qb
|
||||||
.setIndices("test")
|
.setIndices("test")
|
||||||
.setQuery(QueryBuilders.matchAllQuery()));
|
.setQuery(QueryBuilders.matchAllQuery()));
|
||||||
|
@ -76,10 +86,10 @@ class SearchTest {
|
||||||
logger.info(id);
|
logger.info(id);
|
||||||
idcount.incrementAndGet();
|
idcount.incrementAndGet();
|
||||||
});
|
});
|
||||||
assertEquals(numactions, idcount.get());
|
assertEquals(numactions + 1, idcount.get());
|
||||||
assertEquals(13, searchClient.getSearchMetric().getQueries().getCount());
|
assertEquals(15, searchClient.getSearchMetric().getQueries().getCount());
|
||||||
assertEquals(2, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
assertEquals(13, searchClient.getSearchMetric().getSucceededQueries().getCount());
|
||||||
assertEquals(0, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue