align with es221

This commit is contained in:
Jörg Prante 2021-04-14 18:55:56 +02:00
parent 21e5dd84c9
commit 6c48d23de1
13 changed files with 265 additions and 484 deletions

View file

@ -1,7 +1,5 @@
package org.xbib.elx.api; package org.xbib.elx.api;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -12,19 +10,9 @@ import java.util.concurrent.TimeUnit;
*/ */
public interface AdminClient extends BasicClient { public interface AdminClient extends BasicClient {
/** Map<String, ?> getMapping(IndexDefinition indexDefinition) throws IOException;
* Build index definition from settings.
*
* @param index the index name
* @param settings the settings for the index
* @return index definition
* @throws IOException if settings/mapping URL is invalid/malformed
*/
IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException;
Map<String, ?> getMapping(String index) throws IOException; void checkMapping(IndexDefinition indexDefinition);
void checkMapping(String index);
/** /**
* Delete an index. * Delete an index.
@ -33,15 +21,6 @@ public interface AdminClient extends BasicClient {
*/ */
AdminClient deleteIndex(IndexDefinition indexDefinition); AdminClient deleteIndex(IndexDefinition indexDefinition);
/**
* Delete an index.
*
* @param index index
* @return this
*/
AdminClient deleteIndex(String index);
/** /**
* Update replica level. * Update replica level.
* @param indexDefinition the index definition * @param indexDefinition the index definition
@ -51,18 +30,6 @@ public interface AdminClient extends BasicClient {
*/ */
AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException; AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException;
/**
* Update replica level.
*
* @param index index
* @param level the replica level
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return this
* @throws IOException if replica setting could not be updated
*/
AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException;
/** /**
* Get replica level. * Get replica level.
* @param indexDefinition the index name * @param indexDefinition the index name
@ -70,13 +37,6 @@ public interface AdminClient extends BasicClient {
*/ */
int getReplicaLevel(IndexDefinition indexDefinition); int getReplicaLevel(IndexDefinition indexDefinition);
/**
* Get replica level.
* @param index the index name
* @return the replica level of the index
*/
int getReplicaLevel(String index);
/** /**
* Force segment merge of an index. * Force segment merge of an index.
* @param indexDefinition the index definition * @param indexDefinition the index definition
@ -84,15 +44,6 @@ public interface AdminClient extends BasicClient {
*/ */
boolean forceMerge(IndexDefinition indexDefinition); boolean forceMerge(IndexDefinition indexDefinition);
/**
* Force segment merge of an index.
* @param index the index
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return this
*/
boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit);
/** /**
* Resolve alias. * Resolve alias.
* *
@ -117,15 +68,6 @@ public interface AdminClient extends BasicClient {
*/ */
Map<String, String> getAliases(String index); Map<String, String> getAliases(String index);
/**
* Shift from one index to another.
* @param indexDefinition the index definition
* @param additionalAliases new aliases
* @return this
*/
IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
List<String> additionalAliases);
/** /**
* Shift from one index to another. * Shift from one index to another.
* @param indexDefinition the index definition * @param indexDefinition the index definition
@ -147,10 +89,10 @@ public interface AdminClient extends BasicClient {
/** /**
* Find the timestamp of the most recently indexed document in the index. * Find the timestamp of the most recently indexed document in the index.
* *
* @param index the index name * @param indexDefinition the index definition
* @param timestampfieldname the timestamp field name * @param timestampfieldname the timestamp field name
* @return millis UTC millis of the most recent document * @return millis UTC millis of the most recent document
* @throws IOException if most rcent document can not be found * @throws IOException if most rcent document can not be found
*/ */
Long mostRecentDocument(String index, String timestampfieldname) throws IOException; Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) throws IOException;
} }

View file

@ -56,7 +56,7 @@ public interface BasicClient extends Closeable {
void waitForShards(long maxWaitTime, TimeUnit timeUnit); void waitForShards(long maxWaitTime, TimeUnit timeUnit);
long getSearchableDocs(String index); long getSearchableDocs(IndexDefinition indexDefinition);
boolean isIndexExists(String index); boolean isIndexExists(IndexDefinition indexDefinition);
} }

View file

@ -4,11 +4,8 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.Flushable; import java.io.Flushable;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface BulkClient extends BasicClient, Flushable { public interface BulkClient extends BasicClient, Flushable {
@ -19,15 +16,6 @@ public interface BulkClient extends BasicClient, Flushable {
*/ */
BulkController getBulkController(); BulkController getBulkController();
/**
* Create a new index.
*
* @param index index
* @throws IOException if new index creation fails
*/
void newIndex(String index) throws IOException;
/** /**
* Create a new index. * Create a new index.
* @param indexDefinition the index definition * @param indexDefinition the index definition
@ -36,57 +24,43 @@ public interface BulkClient extends BasicClient, Flushable {
void newIndex(IndexDefinition indexDefinition) throws IOException; void newIndex(IndexDefinition indexDefinition) throws IOException;
/** /**
* Create a new index. * Start bulk mode for indexes.
* * @param indexDefinition index definition
* @param index index * @throws IOException if bulk could not be started
* @param settings settings
* @throws IOException if settings is invalid or index creation fails
*/ */
void newIndex(String index, Settings settings) throws IOException; void startBulk(IndexDefinition indexDefinition) throws IOException;
/** /**
* Create a new index. * Stop bulk mode.
* *
* @param index index * @param indexDefinition index definition
* @param settings settings * @throws IOException if bulk could not be startet
* @param mapping mapping
* @throws IOException if settings/mapping is invalid or index creation fails
*/ */
void newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException; void stopBulk(IndexDefinition indexDefinition) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param settings settings
* @param mapping mapping
* @throws IOException if settings/mapping is invalid or index creation fails
*/
void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException;
/** /**
* Add index request. Each request will be added to a queue for bulking requests. * Add index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded. * Submitting request will be done when limits are exceeded.
* *
* @param index the index * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param create true if document must be created * @param create true if document must be created
* @param source the source * @param source the source
* @return this * @return this
*/ */
BulkClient index(String index, String id, boolean create, BytesReference source); BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source);
/** /**
* Index request. Each request will be added to a queue for bulking requests. * Index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded. * Submitting request will be done when limits are exceeded.
* *
* @param index the index * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param create true if document is to be created, false otherwise * @param create true if document is to be created, false otherwise
* @param source the source * @param source the source
* @return this client methods * @return this client methods
*/ */
BulkClient index(String index, String id, boolean create, String source); BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source);
/** /**
* Index request. Each request will be added to a queue for bulking requests. * Index request. Each request will be added to a queue for bulking requests.
@ -100,11 +74,11 @@ public interface BulkClient extends BasicClient, Flushable {
/** /**
* Delete request. * Delete request.
* *
* @param index the index * @param indexDefinition the index definition
* @param id the id * @param id the id
* @return this * @return this
*/ */
BulkClient delete(String index, String id); BulkClient delete(IndexDefinition indexDefinition, String id);
/** /**
* Delete request. Each request will be added to a queue for bulking requests. * Delete request. Each request will be added to a queue for bulking requests.
@ -120,22 +94,22 @@ public interface BulkClient extends BasicClient, Flushable {
* Submitting request will be done when bulk limits are exceeded. * Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized. * Note that updates only work correctly when all operations between nodes are synchronized.
* *
* @param index the index * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param source the source * @param source the source
* @return this * @return this
*/ */
BulkClient update(String index, String id, BytesReference source); BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source);
/** /**
* Update document. Use with precaution! Does not work in all cases. * Update document. Use with precaution! Does not work in all cases.
* *
* @param index the index * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param source the source * @param source the source
* @return this * @return this
*/ */
BulkClient update(String index, String id, String source); BulkClient update(IndexDefinition indexDefinition, String id, String source);
/** /**
* Bulked update request. Each request will be added to a queue for bulking requests. * Bulked update request. Each request will be added to a queue for bulking requests.
@ -147,42 +121,6 @@ public interface BulkClient extends BasicClient, Flushable {
*/ */
BulkClient update(UpdateRequest updateRequest); BulkClient update(UpdateRequest updateRequest);
/**
* Start bulk mode for indexes.
* @param indexDefinition index definition
* @throws IOException if bulk could not be started
*/
void startBulk(IndexDefinition indexDefinition) throws IOException;
/**
* Start bulk mode.
*
* @param index index
* @param startRefreshIntervalSeconds refresh interval before bulk
* @param stopRefreshIntervalSeconds refresh interval after bulk
* @throws IOException if bulk could not be started
*/
void startBulk(String index, long startRefreshIntervalSeconds,
long stopRefreshIntervalSeconds) throws IOException;
/**
* Stop bulk mode.
*
* @param indexDefinition index definition
* @throws IOException if bulk could not be startet
*/
void stopBulk(IndexDefinition indexDefinition) throws IOException;
/**
* Stops bulk mode.
*
* @param index index
* @param timeout maximum wait time
* @param timeUnit time unit for timeout
* @throws IOException if bulk could not be stopped
*/
void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException;
/** /**
* Wait for all outstanding bulk responses. * Wait for all outstanding bulk responses.
* *
@ -206,15 +144,15 @@ public interface BulkClient extends BasicClient, Flushable {
/** /**
* Refresh the index. * Refresh the index.
* *
* @param index index * @param indexDefinition index definition
*/ */
void refreshIndex(String index); void refreshIndex(IndexDefinition indexDefinition);
/** /**
* Flush the index. The cluster clears cache and completes indexing. * Flush the index. The cluster clears cache and completes indexing.
* *
* @param index index * @param indexDefinition index definition
*/ */
void flushIndex(String index); void flushIndex(IndexDefinition indexDefinition);
} }

View file

@ -22,9 +22,6 @@ public interface BulkController extends Closeable, Flushable {
void startBulkMode(IndexDefinition indexDefinition) throws IOException; void startBulkMode(IndexDefinition indexDefinition) throws IOException;
void startBulkMode(String indexName, long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException;
void bulkIndex(IndexRequest indexRequest); void bulkIndex(IndexRequest indexRequest);
void bulkDelete(DeleteRequest deleteRequest); void bulkDelete(DeleteRequest deleteRequest);
@ -34,6 +31,4 @@ public interface BulkController extends Closeable, Flushable {
boolean waitForBulkResponses(long timeout, TimeUnit timeUnit); boolean waitForBulkResponses(long timeout, TimeUnit timeUnit);
void stopBulkMode(IndexDefinition indexDefinition) throws IOException; void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;
} }

View file

@ -16,6 +16,10 @@ public interface IndexDefinition {
String getIndex(); String getIndex();
IndexDefinition setType(String type);
String getType();
IndexDefinition setFullIndexName(String fullIndexName); IndexDefinition setFullIndexName(String fullIndexName);
String getFullIndexName(); String getFullIndexName();
@ -36,6 +40,14 @@ public interface IndexDefinition {
Pattern getDateTimePattern(); Pattern getDateTimePattern();
IndexDefinition setStartBulkRefreshSeconds(int seconds);
int getStartBulkRefreshSeconds();
IndexDefinition setStopBulkRefreshSeconds(int seconds);
int getStopBulkRefreshSeconds();
IndexDefinition setEnabled(boolean enabled); IndexDefinition setEnabled(boolean enabled);
boolean isEnabled(); boolean isEnabled();
@ -69,12 +81,4 @@ public interface IndexDefinition {
long getMaxWaitTime(); long getMaxWaitTime();
TimeUnit getMaxWaitTimeUnit(); TimeUnit getMaxWaitTimeUnit();
IndexDefinition setStartRefreshInterval(long seconds);
long getStartRefreshInterval();
IndexDefinition setStopRefreshInterval(long seconds);
long getStopRefreshInterval();
} }

View file

@ -37,16 +37,8 @@ import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.yaml.YamlXContent;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -58,16 +50,9 @@ import org.xbib.elx.api.AdminClient;
import org.xbib.elx.api.IndexAliasAdder; import org.xbib.elx.api.IndexAliasAdder;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexPruneResult;
import org.xbib.elx.api.IndexRetention;
import org.xbib.elx.api.IndexShiftResult; import org.xbib.elx.api.IndexShiftResult;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -95,22 +80,24 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName());
@Override @Override
public Map<String, ?> getMapping(String index) { public Map<String, ?> getMapping(IndexDefinition indexDefinition) {
if (!ensureIndexDefinition(indexDefinition)) {
return null;
}
GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE)
.setIndices(index) .setIndices(indexDefinition.getFullIndexName())
.setTypes(TYPE_NAME); .setTypes(TYPE_NAME);
GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet();
return getMappingsResponse.getMappings().get(index).get(TYPE_NAME).getSourceAsMap(); return getMappingsResponse.getMappings()
.get(indexDefinition.getFullIndexName())
.get(TYPE_NAME)
.getSourceAsMap();
} }
@Override @Override
public AdminClient deleteIndex(IndexDefinition indexDefinition) { public AdminClient deleteIndex(IndexDefinition indexDefinition) {
return deleteIndex(indexDefinition.getFullIndexName());
}
@Override
public AdminClient deleteIndex(String index) {
ensureClientIsPresent(); ensureClientIsPresent();
String index = indexDefinition.getFullIndexName();
if (index == null) { if (index == null) {
logger.warn("no index name given to delete index"); logger.warn("no index name given to delete index");
return this; return this;
@ -123,27 +110,26 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException {
return updateReplicaLevel(indexDefinition.getFullIndexName(), level, if (!ensureIndexDefinition(indexDefinition)) {
indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); return this;
} }
@Override
public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException {
if (level < 1) { if (level < 1) {
logger.warn("invalid replica level"); logger.warn("invalid replica level");
return this; return this;
} }
String index = indexDefinition.getFullIndexName();
long maxWaitTime = indexDefinition.getMaxWaitTime();
TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit();
updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit);
return this; return this;
} }
@Override @Override
public int getReplicaLevel(IndexDefinition indexDefinition) { public int getReplicaLevel(IndexDefinition indexDefinition) {
return getReplicaLevel(indexDefinition.getFullIndexName()); if (!ensureIndexDefinition(indexDefinition)) {
return -1;
} }
String index = indexDefinition.getFullIndexName();
@Override
public int getReplicaLevel(String index) {
GetSettingsRequest request = new GetSettingsRequest().indices(index); GetSettingsRequest request = new GetSettingsRequest().indices(index);
GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet(); GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet();
int replica = -1; int replica = -1;
@ -158,10 +144,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public String resolveMostRecentIndex(String alias) { public String resolveMostRecentIndex(String alias) {
ensureClientIsPresent();
if (alias == null) { if (alias == null) {
return null; return null;
} }
ensureClientIsPresent();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias); GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias);
GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
@ -211,12 +197,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
.sorted().collect(Collectors.toList()); .sorted().collect(Collectors.toList());
} }
@Override
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
List<String> additionalAliases) {
return shiftIndex(indexDefinition, additionalAliases, null);
}
@Override @Override
public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, public IndexShiftResult shiftIndex(IndexDefinition indexDefinition,
List<String> additionalAliases, List<String> additionalAliases,
@ -381,7 +361,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
} }
@Override @Override
public Long mostRecentDocument(String index, String timestampfieldname) { public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) {
if (!ensureIndexDefinition(indexDefinition)) {
return -1L;
}
ensureClientIsPresent(); ensureClientIsPresent();
SortBuilder<?> sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); SortBuilder<?> sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SearchSourceBuilder builder = new SearchSourceBuilder(); SearchSourceBuilder builder = new SearchSourceBuilder();
@ -389,7 +372,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
builder.storedField(timestampfieldname); builder.storedField(timestampfieldname);
builder.size(1); builder.size(1);
SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index); searchRequest.indices(indexDefinition.getFullIndexName());
searchRequest.source(builder); searchRequest.source(builder);
SearchResponse searchResponse = SearchResponse searchResponse =
client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
@ -401,25 +384,25 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return 0L; return 0L;
} }
} }
return null; // almost impossible
return -1L;
} }
@Override @Override
public boolean forceMerge(IndexDefinition indexDefinition) { public boolean forceMerge(IndexDefinition indexDefinition) {
if (indexDefinition.isForceMergeEnabled()) { if (!ensureIndexDefinition(indexDefinition)) {
return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime(),
indexDefinition.getMaxWaitTimeUnit());
}
return false; return false;
} }
if (!indexDefinition.isForceMergeEnabled()) {
@Override return false;
public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) { }
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ensureClientIsPresent();
String index = indexDefinition.getFullIndexName();
ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); ForceMergeRequest forceMergeRequest = new ForceMergeRequest();
forceMergeRequest.indices(index); forceMergeRequest.indices(index);
try { try {
client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).get(timeout.getMillis(), TimeUnit.MILLISECONDS); client.execute(ForceMergeAction.INSTANCE, forceMergeRequest)
.get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
return true; return true;
} catch (TimeoutException e) { } catch (TimeoutException e) {
logger.error("timeout"); logger.error("timeout");
@ -432,45 +415,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
return false; return false;
} }
@Override
public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings)
throws IOException {
boolean isEnabled = settings.getAsBoolean("enabled", false);
String indexName = settings.get("name", index);
String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$");
Pattern dateTimePattern = Pattern.compile(dateTimePatternStr);
String dateTimeFormat = settings.get("dateTimeFormat", "yyyyMMdd");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat)
.withZone(ZoneId.systemDefault());
String fullName = indexName + dateTimeFormatter.format(LocalDate.now());
String fullIndexName = resolveAlias(fullName).stream().findFirst().orElse(fullName);
IndexRetention indexRetention = new DefaultIndexRetention()
.setMinToKeep(settings.getAsInt("retention.mintokeep", 0))
.setDelta(settings.getAsInt("retention.delta", 0));
return new DefaultIndexDefinition()
.setEnabled(isEnabled)
.setIndex(indexName)
.setFullIndexName(fullIndexName)
.setSettings(findSettingsFrom(settings.get("settings")))
.setMappings(findMappingsFrom(settings.get("mapping")))
.setDateTimeFormatter(dateTimeFormatter)
.setDateTimePattern(dateTimePattern)
.setIgnoreErrors(settings.getAsBoolean("skiperrors", false))
.setShift(settings.getAsBoolean("shift", true))
.setPrune(settings.getAsBoolean("prune", true))
.setReplicaLevel(settings.getAsInt("replica", 0))
.setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS)
.setRetention(indexRetention)
.setStartRefreshInterval(settings.getAsLong("bulk.startrefreshinterval", -1L))
.setStopRefreshInterval(settings.getAsLong("bulk.stoprefreshinterval", -1L));
}
@Override @Override
public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException {
ensureClientIsPresent();
if (index == null) { if (index == null) {
throw new IOException("no index name given"); throw new IOException("no index name given");
} }
ensureClientIsPresent();
Settings.Builder updateSettingsBuilder = Settings.builder(); Settings.Builder updateSettingsBuilder = Settings.builder();
updateSettingsBuilder.put(key, value.toString()); updateSettingsBuilder.put(key, value.toString());
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
@ -479,65 +429,20 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
} }
@Override @Override
public void checkMapping(String index) { public void checkMapping(IndexDefinition indexDefinition) {
ensureClientIsPresent(); ensureClientIsPresent();
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(indexDefinition.getFullIndexName());
GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetadata>> map = getMappingsResponse.getMappings(); ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetadata>> map = getMappingsResponse.getMappings();
map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> { map.keys().forEach((Consumer<ObjectCursor<String>>) stringObjectCursor -> {
ImmutableOpenMap<String, MappingMetadata> mappings = map.get(stringObjectCursor.value); ImmutableOpenMap<String, MappingMetadata> mappings = map.get(stringObjectCursor.value);
for (ObjectObjectCursor<String, MappingMetadata> cursor : mappings) { for (ObjectObjectCursor<String, MappingMetadata> cursor : mappings) {
MappingMetadata mappingMetaData = cursor.value; MappingMetadata mappingMetaData = cursor.value;
checkMapping(index, mappingMetaData); checkMapping(indexDefinition.getFullIndexName(), mappingMetaData);
} }
}); });
} }
private static String findSettingsFrom(String string) throws IOException {
if (string == null) {
return null;
}
try {
URL url = new URL(string);
try (InputStream inputStream = url.openStream()) {
Settings settings = Settings.builder().loadFromStream(string, inputStream, true).build();
XContentBuilder builder = JsonXContent.contentBuilder();
settings.toXContent(builder, ToXContent.EMPTY_PARAMS);
return Strings.toString(builder);
}
} catch (MalformedURLException e) {
return string;
}
}
private static String findMappingsFrom(String string) throws IOException {
if (string == null) {
return null;
}
try {
URL url = new URL(string);
try (InputStream inputStream = url.openStream()) {
if (string.endsWith(".json")) {
Map<String, ?> mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered();
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject().map(mappings).endObject();
return Strings.toString(builder);
}
if (string.endsWith(".yml") || string.endsWith(".yaml")) {
Map<String, ?> mappings = YamlXContent.yamlXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered();
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject().map(mappings).endObject();
return Strings.toString(builder);
}
}
return string;
} catch (MalformedURLException e) {
return string;
}
}
private Map<String, String> getFilters(GetAliasesResponse getAliasesResponse) { private Map<String, String> getFilters(GetAliasesResponse getAliasesResponse) {
Map<String, String> result = new HashMap<>(); Map<String, String> result = new HashMap<>();
for (ObjectObjectCursor<String, List<AliasMetadata>> object : getAliasesResponse.getAliases()) { for (ObjectObjectCursor<String, List<AliasMetadata>> object : getAliasesResponse.getAliases()) {
@ -630,8 +535,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
.setQuery(queryBuilder) .setQuery(queryBuilder)
.setSize(0) .setSize(0)
.setTrackTotalHits(true); .setTrackTotalHits(true);
SearchResponse searchResponse = SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
searchRequestBuilder.execute().actionGet();
fields.put(path, searchResponse.getHits().getTotalHits().value); fields.put(path, searchResponse.getHits().getTotalHits().value);
} }
} }
@ -783,7 +687,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
@Override @Override
public String toString() { public String toString() {
return "NOTHING TO DO"; return "EMPTY PRUNE";
} }
} }
} }

View file

@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.xbib.elx.api.BasicClient; import org.xbib.elx.api.BasicClient;
import org.xbib.elx.api.IndexDefinition;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -58,8 +59,6 @@ public abstract class AbstractBasicClient implements BasicClient {
logger.log(Level.INFO, "initializing client with settings = " + settings.toDelimitedString(',')); logger.log(Level.INFO, "initializing client with settings = " + settings.toDelimitedString(','));
this.settings = settings; this.settings = settings;
setClient(createClient(settings)); setClient(createClient(settings));
} else {
logger.log(Level.WARN, "not initializing client");
} }
} }
@ -88,8 +87,11 @@ public abstract class AbstractBasicClient implements BasicClient {
ensureClientIsPresent(); ensureClientIsPresent();
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); .timeout(timeout)
.waitForStatus(status);
ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse != null && healthResponse.isTimedOut()) { if (healthResponse != null && healthResponse.isTimedOut()) {
String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name(); String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name();
logger.error(message); logger.error(message);
@ -100,8 +102,8 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override @Override
public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { public void waitForShards(long maxWaitTime, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();
logger.info("waiting for cluster shard settling");
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
logger.log(Level.DEBUG, "waiting " + timeout + " for shard settling down");
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
.waitForNoInitializingShards(true) .waitForNoInitializingShards(true)
.waitForNoRelocatingShards(true) .waitForNoRelocatingShards(true)
@ -109,7 +111,7 @@ public abstract class AbstractBasicClient implements BasicClient {
ClusterHealthResponse healthResponse = ClusterHealthResponse healthResponse =
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
if (healthResponse.isTimedOut()) { if (healthResponse.isTimedOut()) {
String message = "timeout while waiting for cluster shards: " + timeout; String message = "timeout waiting for cluster shards: " + timeout;
logger.error(message); logger.error(message);
throw new IllegalStateException(message); throw new IllegalStateException(message);
} }
@ -137,9 +139,9 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public long getSearchableDocs(String index) { public long getSearchableDocs(IndexDefinition indexDefinition) {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(index) .setIndices(indexDefinition.getFullIndexName())
.setQuery(QueryBuilders.matchAllQuery()) .setQuery(QueryBuilders.matchAllQuery())
.setSize(0) .setSize(0)
.setTrackTotalHits(true); .setTrackTotalHits(true);
@ -147,15 +149,14 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
@Override @Override
public boolean isIndexExists(String index) { public boolean isIndexExists(IndexDefinition indexDefinition) {
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
indicesExistsRequest.indices(index); indicesExistsRequest.indices(indexDefinition.getFullIndexName());
IndicesExistsResponse indicesExistsResponse = IndicesExistsResponse indicesExistsResponse =
client.execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); client.execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
return indicesExistsResponse.isExists(); return indicesExistsResponse.isExists();
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
ensureClientIsPresent(); ensureClientIsPresent();
@ -192,6 +193,14 @@ public abstract class AbstractBasicClient implements BasicClient {
} }
} }
protected boolean ensureIndexDefinition(IndexDefinition indexDefinition) {
if (!indexDefinition.isEnabled()) {
logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled");
return false;
}
return true;
}
protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) { protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) {
switch (timeUnit) { switch (timeUnit) {
case DAYS: case DAYS:

View file

@ -79,78 +79,65 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override @Override
public void newIndex(IndexDefinition indexDefinition) throws IOException { public void newIndex(IndexDefinition indexDefinition) throws IOException {
Settings settings = indexDefinition.getSettings() == null ? null : if (!ensureIndexDefinition(indexDefinition)) {
Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build();
Map<String, ?> mappings = indexDefinition.getMappings() == null ? null :
JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered();
newIndex(indexDefinition.getFullIndexName(), settings, mappings);
}
@Override
public void newIndex(String index) throws IOException {
newIndex(index, Settings.EMPTY, (Map<String, ?>) null);
}
@Override
public void newIndex(String index, Settings settings) throws IOException {
newIndex(index, settings, (Map<String, ?>) null);
}
@Override
public void newIndex(String index, Settings settings, Map<String, ?> mapping) throws IOException {
if (mapping == null || mapping.isEmpty()) {
newIndex(index, settings, (XContentBuilder) null);
} else {
newIndex(index, settings, JsonXContent.contentBuilder().map(mapping));
}
}
@Override
public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException {
if (index == null) {
logger.warn("unable to create index, no index name given");
return; return;
} }
String index = indexDefinition.getFullIndexName();
if (index == null) {
throw new IllegalArgumentException("no index name given");
}
ensureClientIsPresent(); ensureClientIsPresent();
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE)
.setIndex(index); .setIndex(index);
if (settings != null) { if (indexDefinition.getSettings() == null) {
createIndexRequestBuilder.setSettings(settings); XContentBuilder builder = JsonXContent.contentBuilder()
.startObject()
.startObject("index")
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject();
indexDefinition.setSettings(Strings.toString(builder));
} }
if (builder != null) { Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build();
createIndexRequestBuilder.addMapping(TYPE_NAME, builder); createIndexRequestBuilder.setSettings(settings);
logger.debug("adding mapping = {}", Strings.toString(builder)); Map<String, ?> mappings = indexDefinition.getMappings() == null ? null :
JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered();
if (mappings != null) {
createIndexRequestBuilder.addMapping(TYPE_NAME, mappings);
} else { } else {
createIndexRequestBuilder.addMapping(TYPE_NAME, createIndexRequestBuilder.addMapping(TYPE_NAME,
JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject()); JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject());
logger.debug("empty mapping");
} }
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
if (createIndexResponse.isAcknowledged()) { if (createIndexResponse.isAcknowledged()) {
logger.info("index {} created", index); logger.info("index {} created", index);
} else { } else {
logger.warn("index creation of {} not acknowledged", index); logger.warn("index creation of {} not acknowledged", index);
return;
} }
refreshIndex(index); // we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly.
logger.info("waiting for GREEN after index {} was created", index);
waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
} }
@Override @Override
public void startBulk(IndexDefinition indexDefinition) throws IOException { public void startBulk(IndexDefinition indexDefinition) throws IOException {
startBulk(indexDefinition.getFullIndexName(), -1, 1); if (!ensureIndexDefinition(indexDefinition)) {
return;
} }
@Override
public void startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
throws IOException {
if (bulkController != null) { if (bulkController != null) {
ensureClientIsPresent(); ensureClientIsPresent();
bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); bulkController.startBulkMode(indexDefinition);
} }
} }
@Override @Override
public void stopBulk(IndexDefinition indexDefinition) throws IOException { public void stopBulk(IndexDefinition indexDefinition) throws IOException {
if (!ensureIndexDefinition(indexDefinition)) {
return;
}
if (bulkController != null) { if (bulkController != null) {
ensureClientIsPresent(); ensureClientIsPresent();
bulkController.stopBulkMode(indexDefinition); bulkController.stopBulkMode(indexDefinition);
@ -158,22 +145,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException { public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source) {
if (bulkController != null) { return index(indexDefinition, id, create,
ensureClientIsPresent(); new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
bulkController.stopBulkMode(index, timeout, timeUnit);
}
} }
@Override @Override
public BulkClient index(String index, String id, boolean create, String source) { public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) {
return index(new IndexRequest().index(index).id(id).create(create) if (!ensureIndexDefinition(indexDefinition)) {
.source(new BytesArray(source.getBytes(StandardCharsets.UTF_8)), XContentType.JSON)); return this;
} }
return index(new IndexRequest().index(indexDefinition.getFullIndexName()).id(id).create(create)
@Override
public BulkClient index(String index, String id, boolean create, BytesReference source) {
return index(new IndexRequest().index(index).id(id).create(create)
.source(source, XContentType.JSON)); .source(source, XContentType.JSON));
} }
@ -187,8 +169,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public BulkClient delete(String index, String id) { public BulkClient delete(IndexDefinition indexDefinition, String id) {
return delete(new DeleteRequest().index(index).id(id)); if (!ensureIndexDefinition(indexDefinition)) {
return this;
}
return delete(new DeleteRequest().index(indexDefinition.getFullIndexName()).id(id));
} }
@Override @Override
@ -201,21 +186,25 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public BulkClient update(String index, String id, BytesReference source) { public BulkClient update(IndexDefinition indexDefinition, String id, String source) {
return update(new UpdateRequest().index(index).id(id) return update(indexDefinition, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8)));
}
@Override
public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) {
if (!ensureIndexDefinition(indexDefinition)) {
return this;
}
return update(new UpdateRequest().index(indexDefinition.getFullIndexName()).id(id)
.doc(source, XContentType.JSON)); .doc(source, XContentType.JSON));
} }
@Override
public BulkClient update(String index, String id, String source) {
return update(new UpdateRequest().index(index).id(id)
.doc(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON));
}
@Override @Override
public BulkClient update(UpdateRequest updateRequest) { public BulkClient update(UpdateRequest updateRequest) {
if (bulkController != null) {
ensureClientIsPresent(); ensureClientIsPresent();
bulkController.bulkUpdate(updateRequest); bulkController.bulkUpdate(updateRequest);
}
return this; return this;
} }
@ -231,18 +220,20 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
} }
@Override @Override
public void flushIndex(String index) { public void flushIndex(IndexDefinition indexDefinition) {
if (index != null) { if (!ensureIndexDefinition(indexDefinition)) {
ensureClientIsPresent(); return;
client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet();
} }
ensureClientIsPresent();
client.execute(FlushAction.INSTANCE, new FlushRequest(indexDefinition.getFullIndexName())).actionGet();
} }
@Override @Override
public void refreshIndex(String index) { public void refreshIndex(IndexDefinition indexDefinition) {
if (index != null) { if (!ensureIndexDefinition(indexDefinition)) {
return;
}
ensureClientIsPresent(); ensureClientIsPresent();
client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet(); client.execute(RefreshAction.INSTANCE, new RefreshRequest(indexDefinition.getFullIndexName())).actionGet();
}
} }
} }

View file

@ -39,7 +39,7 @@ public class ClientBuilder {
} }
public ClientBuilder(ElasticsearchClient client) { public ClientBuilder(ElasticsearchClient client) {
this(client, Thread.currentThread().getContextClassLoader()); this(client, ClassLoader.getSystemClassLoader());
} }
public ClientBuilder(ElasticsearchClient client, ClassLoader classLoader) { public ClientBuilder(ElasticsearchClient client, ClassLoader classLoader) {
@ -47,6 +47,17 @@ public class ClientBuilder {
this.classLoader = classLoader; this.classLoader = classLoader;
this.settingsBuilder = Settings.builder(); this.settingsBuilder = Settings.builder();
settingsBuilder.put("node.name", "elx-client-" + Version.CURRENT); settingsBuilder.put("node.name", "elx-client-" + Version.CURRENT);
for (Parameters p : Parameters.values()) {
if (p.getType() == Boolean.class) {
settingsBuilder.put(p.getName(), p.getBoolean());
}
if (p.getType() == Integer.class) {
settingsBuilder.put(p.getName(), p.getInteger());
}
if (p.getType() == String.class) {
settingsBuilder.put(p.getName(), p.getString());
}
}
} }
public static ClientBuilder builder() { public static ClientBuilder builder() {

View file

@ -16,10 +16,6 @@ import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexDefinition;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -35,11 +31,7 @@ public class DefaultBulkController implements BulkController {
private BulkProcessor bulkProcessor; private BulkProcessor bulkProcessor;
private final List<String> indexNames; private BulkListener bulkListener;
private final Map<String, Long> startBulkRefreshIntervals;
private final Map<String, Long> stopBulkRefreshIntervals;
private final long maxWaitTime; private final long maxWaitTime;
@ -50,10 +42,7 @@ public class DefaultBulkController implements BulkController {
public DefaultBulkController(BulkClient bulkClient) { public DefaultBulkController(BulkClient bulkClient) {
this.bulkClient = bulkClient; this.bulkClient = bulkClient;
this.bulkMetric = new DefaultBulkMetric(); this.bulkMetric = new DefaultBulkMetric();
this.indexNames = new ArrayList<>();
this.active = new AtomicBoolean(false); this.active = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
this.stopBulkRefreshIntervals = new HashMap<>();
this.maxWaitTime = 30L; this.maxWaitTime = 30L;
this.maxWaitTimeUnit = TimeUnit.SECONDS; this.maxWaitTimeUnit = TimeUnit.SECONDS;
} }
@ -71,31 +60,38 @@ public class DefaultBulkController implements BulkController {
@Override @Override
public void init(Settings settings) { public void init(Settings settings) {
bulkMetric.init(settings); bulkMetric.init(settings);
int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(),
Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); Parameters.MAX_ACTIONS_PER_REQUEST.getInteger());
int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(),
Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum()); Parameters.MAX_CONCURRENT_REQUESTS.getInteger());
TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(),
TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum())); Parameters.FLUSH_INTERVAL.getString());
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), TimeValue flushIngestInterval = TimeValue.parseTimeValue(flushIngestIntervalStr,
ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), TimeValue.timeValueSeconds(30), "");
"maxVolumePerRequest")); ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(),
boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m"));
Parameters.ENABLE_BULK_LOGGING.getValue()); boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(),
BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); Parameters.ENABLE_BULK_LOGGING.getBoolean());
boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(),
Parameters.FAIL_ON_BULK_ERROR.getBoolean());
this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging, failOnBulkError);
this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener)
.setBulkActions(maxActionsPerRequest) .setBulkActions(maxActionsPerRequest)
.setConcurrentRequests(maxConcurrentRequests) .setConcurrentRequests(maxConcurrentRequests)
.setFlushInterval(flushIngestInterval) .setFlushInterval(flushIngestInterval)
.setBulkSize(maxVolumePerRequest) .setBulkSize(maxVolumePerRequest)
.build(); .build();
this.active.set(true);
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {} bulk logging = {} logger debug = {} from settings = {}", "flushIngestInterval = {} maxVolumePerRequest = {} " +
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest, "bulk logging = {} fail on bulk error = {} " +
enableBulkLogging, logger.isDebugEnabled(), settings.toDelimitedString(',')); "logger debug = {} from settings = {}",
maxActionsPerRequest, maxConcurrentRequests,
flushIngestInterval, maxVolumePerRequest,
enableBulkLogging, failOnBulkError,
logger.isDebugEnabled(), settings.toDelimitedString(','));
} }
this.active.set(true);
} }
@Override @Override
@ -105,24 +101,13 @@ public class DefaultBulkController implements BulkController {
@Override @Override
public void startBulkMode(IndexDefinition indexDefinition) throws IOException { public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(), String indexName = indexDefinition.getFullIndexName();
indexDefinition.getStopRefreshInterval()); if (indexDefinition.getStartBulkRefreshSeconds() != 0) {
} bulkClient.updateIndexSetting(indexName, "refresh_interval",
indexDefinition.getStartBulkRefreshSeconds() + "s",
@Override
public void startBulkMode(String indexName,
long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException {
if (!indexNames.contains(indexName)) {
indexNames.add(indexName);
startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds);
stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds);
if (startRefreshIntervalInSeconds != 0L) {
bulkClient.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s",
30L, TimeUnit.SECONDS); 30L, TimeUnit.SECONDS);
} }
} }
}
@Override @Override
public void bulkIndex(IndexRequest indexRequest) { public void bulkIndex(IndexRequest indexRequest) {
@ -169,32 +154,31 @@ public class DefaultBulkController implements BulkController {
@Override @Override
public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) { public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) {
try { try {
if (bulkProcessor != null) {
bulkProcessor.flush();
return bulkProcessor.awaitFlush(timeout, timeUnit); return bulkProcessor.awaitFlush(timeout, timeUnit);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
logger.error("interrupted"); logger.error("interrupted");
return false; return false;
} catch (IOException e) {
logger.error(e.getMessage(), e);
return false;
} }
return false;
} }
@Override @Override
public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { public void stopBulkMode(IndexDefinition indexDefinition) throws IOException {
stopBulkMode(indexDefinition.getFullIndexName(),
indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
}
@Override
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
flush(); flush();
if (waitForBulkResponses(timeout, timeUnit)) { if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) {
if (indexNames.contains(index)) { if (indexDefinition.getStopBulkRefreshSeconds() != 0) {
Long secs = stopBulkRefreshIntervals.get(index); bulkClient.updateIndexSetting(indexDefinition.getFullIndexName(),
if (secs != null && secs != 0L) { "refresh_interval",
bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s", indexDefinition.getStopBulkRefreshSeconds() + "s",
30L, TimeUnit.SECONDS); 30L, TimeUnit.SECONDS);
} }
indexNames.remove(index);
}
} }
} }
@ -209,15 +193,7 @@ public class DefaultBulkController implements BulkController {
public void close() throws IOException { public void close() throws IOException {
flush(); flush();
bulkMetric.close(); bulkMetric.close();
if (bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit)) { bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit);
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L)
bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s",
30L, TimeUnit.SECONDS);
}
indexNames.clear();
}
if (bulkProcessor != null) { if (bulkProcessor != null) {
bulkProcessor.close(); bulkProcessor.close();
} }

View file

@ -19,14 +19,18 @@ public class DefaultBulkListener implements BulkListener {
private final boolean isBulkLoggingEnabled; private final boolean isBulkLoggingEnabled;
private final boolean failOnError;
private Throwable lastBulkError; private Throwable lastBulkError;
public DefaultBulkListener(BulkController bulkController, public DefaultBulkListener(BulkController bulkController,
BulkMetric bulkMetric, BulkMetric bulkMetric,
boolean isBulkLoggingEnabled) { boolean isBulkLoggingEnabled,
boolean failOnError) {
this.bulkController = bulkController; this.bulkController = bulkController;
this.bulkMetric = bulkMetric; this.bulkMetric = bulkMetric;
this.isBulkLoggingEnabled = isBulkLoggingEnabled; this.isBulkLoggingEnabled = isBulkLoggingEnabled;
this.failOnError = failOnError;
} }
@Override @Override
@ -74,6 +78,10 @@ public class DefaultBulkListener implements BulkListener {
logger.error("bulk [{}] failed with {} failed items, failure message = {}", logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage()); executionId, n, response.buildFailureMessage());
} }
if (failOnError) {
throw new IllegalStateException("bulk failed: id = " + executionId +
" n = " + n + " message = " + response.buildFailureMessage());
}
} else { } else {
bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length);
} }

View file

@ -2,51 +2,54 @@ package org.xbib.elx.common;
public enum Parameters { public enum Parameters {
ENABLE_BULK_LOGGING(false), MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30),
DEFAULT_MAX_ACTIONS_PER_REQUEST(1000), START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0),
DEFAULT_MAX_CONCURRENT_REQUESTS(Runtime.getRuntime().availableProcessors()), STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30),
DEFAULT_MAX_VOLUME_PER_REQUEST("10mb"), ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true),
DEFAULT_FLUSH_INTERVAL(30), FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true),
MAX_ACTIONS_PER_REQUEST ("max_actions_per_request"), MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000),
MAX_CONCURRENT_REQUESTS("max_concurrent_requests"), // 0 = 1 CPU, synchronous requests, &gt; 0 = n + 1 CPUs, asynchronous requests
MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1),
MAX_VOLUME_PER_REQUEST("max_volume_per_request"), MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1mb"),
FLUSH_INTERVAL("flush_interval"); FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s");
boolean value; private final String name;
int num; private final Class<?> type;
String string; private final Object value;
Parameters(boolean value) { Parameters(String name, Class<?> type, Object value) {
this.name = name;
this.type = type;
this.value = value; this.value = value;
} }
Parameters(int num) { public String getName() {
this.num = num; return name;
} }
Parameters(String string) { public Class<?> getType() {
this.string = string; return type;
} }
public boolean getValue() { public Boolean getBoolean() {
return value; return type == Boolean.class ? (Boolean) value : Boolean.FALSE;
} }
public int getNum() { public Integer getInteger() {
return num; return type == Integer.class ? (Integer) value : 0;
} }
public String getString() { public String getString() {
return string; return type == String.class ? (String) value : null;
} }
} }