From 6c48d23de104eb6fc8d05cdaebc0d8b04e25dc12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Wed, 14 Apr 2021 18:55:56 +0200 Subject: [PATCH] align with es221 --- .../java/org/xbib/elx/api/AdminClient.java | 66 +------ .../java/org/xbib/elx/api/BasicClient.java | 4 +- .../java/org/xbib/elx/api/BulkClient.java | 106 +++------- .../java/org/xbib/elx/api/BulkController.java | 5 - .../org/xbib/elx/api/IndexDefinition.java | 20 +- elx-common/build.gradle | 2 +- .../xbib/elx/common/AbstractAdminClient.java | 182 +++++------------- .../xbib/elx/common/AbstractBasicClient.java | 31 +-- .../xbib/elx/common/AbstractBulkClient.java | 151 +++++++-------- .../org/xbib/elx/common/ClientBuilder.java | 13 +- .../elx/common/DefaultBulkController.java | 112 +++++------ .../xbib/elx/common/DefaultBulkListener.java | 10 +- .../java/org/xbib/elx/common/Parameters.java | 47 ++--- 13 files changed, 265 insertions(+), 484 deletions(-) diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index d97a044..af95ea6 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -1,7 +1,5 @@ package org.xbib.elx.api; -import org.elasticsearch.common.settings.Settings; - import java.io.IOException; import java.util.List; import java.util.Map; @@ -12,19 +10,9 @@ import java.util.concurrent.TimeUnit; */ public interface AdminClient extends BasicClient { - /** - * Build index definition from settings. - * - * @param index the index name - * @param settings the settings for the index - * @return index definition - * @throws IOException if settings/mapping URL is invalid/malformed - */ - IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException; + Map getMapping(IndexDefinition indexDefinition) throws IOException; - Map getMapping(String index) throws IOException; - - void checkMapping(String index); + void checkMapping(IndexDefinition indexDefinition); /** * Delete an index. @@ -33,15 +21,6 @@ public interface AdminClient extends BasicClient { */ AdminClient deleteIndex(IndexDefinition indexDefinition); - /** - * Delete an index. - * - * @param index index - * @return this - */ - AdminClient deleteIndex(String index); - - /** * Update replica level. * @param indexDefinition the index definition @@ -51,18 +30,6 @@ public interface AdminClient extends BasicClient { */ AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException; - /** - * Update replica level. - * - * @param index index - * @param level the replica level - * @param maxWaitTime maximum wait time - * @param timeUnit time unit - * @return this - * @throws IOException if replica setting could not be updated - */ - AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException; - /** * Get replica level. * @param indexDefinition the index name @@ -70,13 +37,6 @@ public interface AdminClient extends BasicClient { */ int getReplicaLevel(IndexDefinition indexDefinition); - /** - * Get replica level. - * @param index the index name - * @return the replica level of the index - */ - int getReplicaLevel(String index); - /** * Force segment merge of an index. * @param indexDefinition the index definition @@ -84,15 +44,6 @@ public interface AdminClient extends BasicClient { */ boolean forceMerge(IndexDefinition indexDefinition); - /** - * Force segment merge of an index. - * @param index the index - * @param maxWaitTime maximum wait time - * @param timeUnit time unit - * @return this - */ - boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit); - /** * Resolve alias. * @@ -117,15 +68,6 @@ public interface AdminClient extends BasicClient { */ Map getAliases(String index); - /** - * Shift from one index to another. - * @param indexDefinition the index definition - * @param additionalAliases new aliases - * @return this - */ - IndexShiftResult shiftIndex(IndexDefinition indexDefinition, - List additionalAliases); - /** * Shift from one index to another. * @param indexDefinition the index definition @@ -147,10 +89,10 @@ public interface AdminClient extends BasicClient { /** * Find the timestamp of the most recently indexed document in the index. * - * @param index the index name + * @param indexDefinition the index definition * @param timestampfieldname the timestamp field name * @return millis UTC millis of the most recent document * @throws IOException if most rcent document can not be found */ - Long mostRecentDocument(String index, String timestampfieldname) throws IOException; + Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) throws IOException; } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index 9b9b4b5..54b8b83 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -56,7 +56,7 @@ public interface BasicClient extends Closeable { void waitForShards(long maxWaitTime, TimeUnit timeUnit); - long getSearchableDocs(String index); + long getSearchableDocs(IndexDefinition indexDefinition); - boolean isIndexExists(String index); + boolean isIndexExists(IndexDefinition indexDefinition); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index b12f49a..5f2a18a 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -4,11 +4,8 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.Flushable; import java.io.IOException; -import java.util.Map; import java.util.concurrent.TimeUnit; public interface BulkClient extends BasicClient, Flushable { @@ -19,15 +16,6 @@ public interface BulkClient extends BasicClient, Flushable { */ BulkController getBulkController(); - - /** - * Create a new index. - * - * @param index index - * @throws IOException if new index creation fails - */ - void newIndex(String index) throws IOException; - /** * Create a new index. * @param indexDefinition the index definition @@ -36,57 +24,43 @@ public interface BulkClient extends BasicClient, Flushable { void newIndex(IndexDefinition indexDefinition) throws IOException; /** - * Create a new index. - * - * @param index index - * @param settings settings - * @throws IOException if settings is invalid or index creation fails + * Start bulk mode for indexes. + * @param indexDefinition index definition + * @throws IOException if bulk could not be started */ - void newIndex(String index, Settings settings) throws IOException; + void startBulk(IndexDefinition indexDefinition) throws IOException; /** - * Create a new index. + * Stop bulk mode. * - * @param index index - * @param settings settings - * @param mapping mapping - * @throws IOException if settings/mapping is invalid or index creation fails + * @param indexDefinition index definition + * @throws IOException if bulk could not be startet */ - void newIndex(String index, Settings settings, XContentBuilder mapping) throws IOException; - - /** - * Create a new index. - * - * @param index index - * @param settings settings - * @param mapping mapping - * @throws IOException if settings/mapping is invalid or index creation fails - */ - void newIndex(String index, Settings settings, Map mapping) throws IOException; + void stopBulk(IndexDefinition indexDefinition) throws IOException; /** * Add index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when limits are exceeded. * - * @param index the index + * @param indexDefinition the index definition * @param id the id * @param create true if document must be created * @param source the source * @return this */ - BulkClient index(String index, String id, boolean create, BytesReference source); + BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source); /** * Index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when limits are exceeded. * - * @param index the index + * @param indexDefinition the index definition * @param id the id * @param create true if document is to be created, false otherwise * @param source the source * @return this client methods */ - BulkClient index(String index, String id, boolean create, String source); + BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source); /** * Index request. Each request will be added to a queue for bulking requests. @@ -100,11 +74,11 @@ public interface BulkClient extends BasicClient, Flushable { /** * Delete request. * - * @param index the index + * @param indexDefinition the index definition * @param id the id * @return this */ - BulkClient delete(String index, String id); + BulkClient delete(IndexDefinition indexDefinition, String id); /** * Delete request. Each request will be added to a queue for bulking requests. @@ -120,22 +94,22 @@ public interface BulkClient extends BasicClient, Flushable { * Submitting request will be done when bulk limits are exceeded. * Note that updates only work correctly when all operations between nodes are synchronized. * - * @param index the index + * @param indexDefinition the index definition * @param id the id * @param source the source * @return this */ - BulkClient update(String index, String id, BytesReference source); + BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source); /** * Update document. Use with precaution! Does not work in all cases. * - * @param index the index + * @param indexDefinition the index definition * @param id the id * @param source the source * @return this */ - BulkClient update(String index, String id, String source); + BulkClient update(IndexDefinition indexDefinition, String id, String source); /** * Bulked update request. Each request will be added to a queue for bulking requests. @@ -147,42 +121,6 @@ public interface BulkClient extends BasicClient, Flushable { */ BulkClient update(UpdateRequest updateRequest); - /** - * Start bulk mode for indexes. - * @param indexDefinition index definition - * @throws IOException if bulk could not be started - */ - void startBulk(IndexDefinition indexDefinition) throws IOException; - - /** - * Start bulk mode. - * - * @param index index - * @param startRefreshIntervalSeconds refresh interval before bulk - * @param stopRefreshIntervalSeconds refresh interval after bulk - * @throws IOException if bulk could not be started - */ - void startBulk(String index, long startRefreshIntervalSeconds, - long stopRefreshIntervalSeconds) throws IOException; - - /** - * Stop bulk mode. - * - * @param indexDefinition index definition - * @throws IOException if bulk could not be startet - */ - void stopBulk(IndexDefinition indexDefinition) throws IOException; - - /** - * Stops bulk mode. - * - * @param index index - * @param timeout maximum wait time - * @param timeUnit time unit for timeout - * @throws IOException if bulk could not be stopped - */ - void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException; - /** * Wait for all outstanding bulk responses. * @@ -206,15 +144,15 @@ public interface BulkClient extends BasicClient, Flushable { /** * Refresh the index. * - * @param index index + * @param indexDefinition index definition */ - void refreshIndex(String index); + void refreshIndex(IndexDefinition indexDefinition); /** * Flush the index. The cluster clears cache and completes indexing. * - * @param index index + * @param indexDefinition index definition */ - void flushIndex(String index); + void flushIndex(IndexDefinition indexDefinition); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java index ec375eb..fe2bcce 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java @@ -22,9 +22,6 @@ public interface BulkController extends Closeable, Flushable { void startBulkMode(IndexDefinition indexDefinition) throws IOException; - void startBulkMode(String indexName, long startRefreshIntervalInSeconds, - long stopRefreshIntervalInSeconds) throws IOException; - void bulkIndex(IndexRequest indexRequest); void bulkDelete(DeleteRequest deleteRequest); @@ -34,6 +31,4 @@ public interface BulkController extends Closeable, Flushable { boolean waitForBulkResponses(long timeout, TimeUnit timeUnit); void stopBulkMode(IndexDefinition indexDefinition) throws IOException; - - void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException; } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 6ab4e8a..396ddf4 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -16,6 +16,10 @@ public interface IndexDefinition { String getIndex(); + IndexDefinition setType(String type); + + String getType(); + IndexDefinition setFullIndexName(String fullIndexName); String getFullIndexName(); @@ -36,6 +40,14 @@ public interface IndexDefinition { Pattern getDateTimePattern(); + IndexDefinition setStartBulkRefreshSeconds(int seconds); + + int getStartBulkRefreshSeconds(); + + IndexDefinition setStopBulkRefreshSeconds(int seconds); + + int getStopBulkRefreshSeconds(); + IndexDefinition setEnabled(boolean enabled); boolean isEnabled(); @@ -69,12 +81,4 @@ public interface IndexDefinition { long getMaxWaitTime(); TimeUnit getMaxWaitTimeUnit(); - - IndexDefinition setStartRefreshInterval(long seconds); - - long getStartRefreshInterval(); - - IndexDefinition setStopRefreshInterval(long seconds); - - long getStopRefreshInterval(); } diff --git a/elx-common/build.gradle b/elx-common/build.gradle index b79e852..26cbdd9 100644 --- a/elx-common/build.gradle +++ b/elx-common/build.gradle @@ -1,4 +1,4 @@ -dependencies{ +dependencies { api project(':elx-api') testImplementation "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}" testImplementation "io.netty:netty-codec-http:${project.property('netty.version')}" diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 9c6bb72..1fc3fa4 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -37,16 +37,8 @@ import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.DeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.common.xcontent.yaml.YamlXContent; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -58,16 +50,9 @@ import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.IndexAliasAdder; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; -import org.xbib.elx.api.IndexRetention; import org.xbib.elx.api.IndexShiftResult; import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.time.LocalDate; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -95,22 +80,24 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); @Override - public Map getMapping(String index) { + public Map getMapping(IndexDefinition indexDefinition) { + if (!ensureIndexDefinition(indexDefinition)) { + return null; + } GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) - .setIndices(index) + .setIndices(indexDefinition.getFullIndexName()) .setTypes(TYPE_NAME); GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); - return getMappingsResponse.getMappings().get(index).get(TYPE_NAME).getSourceAsMap(); + return getMappingsResponse.getMappings() + .get(indexDefinition.getFullIndexName()) + .get(TYPE_NAME) + .getSourceAsMap(); } @Override public AdminClient deleteIndex(IndexDefinition indexDefinition) { - return deleteIndex(indexDefinition.getFullIndexName()); - } - - @Override - public AdminClient deleteIndex(String index) { ensureClientIsPresent(); + String index = indexDefinition.getFullIndexName(); if (index == null) { logger.warn("no index name given to delete index"); return this; @@ -123,27 +110,26 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { - return updateReplicaLevel(indexDefinition.getFullIndexName(), level, - indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); - } - - @Override - public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return this; + } if (level < 1) { logger.warn("invalid replica level"); return this; } + String index = indexDefinition.getFullIndexName(); + long maxWaitTime = indexDefinition.getMaxWaitTime(); + TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit(); updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); return this; } @Override public int getReplicaLevel(IndexDefinition indexDefinition) { - return getReplicaLevel(indexDefinition.getFullIndexName()); - } - - @Override - public int getReplicaLevel(String index) { + if (!ensureIndexDefinition(indexDefinition)) { + return -1; + } + String index = indexDefinition.getFullIndexName(); GetSettingsRequest request = new GetSettingsRequest().indices(index); GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet(); int replica = -1; @@ -158,10 +144,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public String resolveMostRecentIndex(String alias) { - ensureClientIsPresent(); if (alias == null) { return null; } + ensureClientIsPresent(); GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias); GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); @@ -211,12 +197,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements .sorted().collect(Collectors.toList()); } - @Override - public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, - List additionalAliases) { - return shiftIndex(indexDefinition, additionalAliases, null); - } - @Override public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases, @@ -381,7 +361,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @Override - public Long mostRecentDocument(String index, String timestampfieldname) { + public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) { + if (!ensureIndexDefinition(indexDefinition)) { + return -1L; + } ensureClientIsPresent(); SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); SearchSourceBuilder builder = new SearchSourceBuilder(); @@ -389,7 +372,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements builder.storedField(timestampfieldname); builder.size(1); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(index); + searchRequest.indices(indexDefinition.getFullIndexName()); searchRequest.source(builder); SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); @@ -401,25 +384,25 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return 0L; } } - return null; + // almost impossible + return -1L; } @Override public boolean forceMerge(IndexDefinition indexDefinition) { - if (indexDefinition.isForceMergeEnabled()) { - return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime(), - indexDefinition.getMaxWaitTimeUnit()); + if (!ensureIndexDefinition(indexDefinition)) { + return false; } - return false; - } - - @Override - public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) { - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); + if (!indexDefinition.isForceMergeEnabled()) { + return false; + } + ensureClientIsPresent(); + String index = indexDefinition.getFullIndexName(); ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); forceMergeRequest.indices(index); try { - client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).get(timeout.getMillis(), TimeUnit.MILLISECONDS); + client.execute(ForceMergeAction.INSTANCE, forceMergeRequest) + .get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); return true; } catch (TimeoutException e) { logger.error("timeout"); @@ -432,45 +415,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return false; } - @Override - public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) - throws IOException { - boolean isEnabled = settings.getAsBoolean("enabled", false); - String indexName = settings.get("name", index); - String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); - Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); - String dateTimeFormat = settings.get("dateTimeFormat", "yyyyMMdd"); - DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat) - .withZone(ZoneId.systemDefault()); - String fullName = indexName + dateTimeFormatter.format(LocalDate.now()); - String fullIndexName = resolveAlias(fullName).stream().findFirst().orElse(fullName); - IndexRetention indexRetention = new DefaultIndexRetention() - .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) - .setDelta(settings.getAsInt("retention.delta", 0)); - return new DefaultIndexDefinition() - .setEnabled(isEnabled) - .setIndex(indexName) - .setFullIndexName(fullIndexName) - .setSettings(findSettingsFrom(settings.get("settings"))) - .setMappings(findMappingsFrom(settings.get("mapping"))) - .setDateTimeFormatter(dateTimeFormatter) - .setDateTimePattern(dateTimePattern) - .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) - .setShift(settings.getAsBoolean("shift", true)) - .setPrune(settings.getAsBoolean("prune", true)) - .setReplicaLevel(settings.getAsInt("replica", 0)) - .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) - .setRetention(indexRetention) - .setStartRefreshInterval(settings.getAsLong("bulk.startrefreshinterval", -1L)) - .setStopRefreshInterval(settings.getAsLong("bulk.stoprefreshinterval", -1L)); - } - @Override public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { - ensureClientIsPresent(); if (index == null) { throw new IOException("no index name given"); } + ensureClientIsPresent(); Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) @@ -479,65 +429,20 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @Override - public void checkMapping(String index) { + public void checkMapping(IndexDefinition indexDefinition) { ensureClientIsPresent(); - GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(indexDefinition.getFullIndexName()); GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); ImmutableOpenMap> map = getMappingsResponse.getMappings(); map.keys().forEach((Consumer>) stringObjectCursor -> { ImmutableOpenMap mappings = map.get(stringObjectCursor.value); for (ObjectObjectCursor cursor : mappings) { MappingMetadata mappingMetaData = cursor.value; - checkMapping(index, mappingMetaData); + checkMapping(indexDefinition.getFullIndexName(), mappingMetaData); } }); } - private static String findSettingsFrom(String string) throws IOException { - if (string == null) { - return null; - } - try { - URL url = new URL(string); - try (InputStream inputStream = url.openStream()) { - Settings settings = Settings.builder().loadFromStream(string, inputStream, true).build(); - XContentBuilder builder = JsonXContent.contentBuilder(); - settings.toXContent(builder, ToXContent.EMPTY_PARAMS); - return Strings.toString(builder); - } - } catch (MalformedURLException e) { - return string; - } - } - - private static String findMappingsFrom(String string) throws IOException { - if (string == null) { - return null; - } - try { - URL url = new URL(string); - try (InputStream inputStream = url.openStream()) { - if (string.endsWith(".json")) { - Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered(); - XContentBuilder builder = JsonXContent.contentBuilder(); - builder.startObject().map(mappings).endObject(); - return Strings.toString(builder); - } - if (string.endsWith(".yml") || string.endsWith(".yaml")) { - Map mappings = YamlXContent.yamlXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered(); - XContentBuilder builder = JsonXContent.contentBuilder(); - builder.startObject().map(mappings).endObject(); - return Strings.toString(builder); - } - } - return string; - } catch (MalformedURLException e) { - return string; - } - } - private Map getFilters(GetAliasesResponse getAliasesResponse) { Map result = new HashMap<>(); for (ObjectObjectCursor> object : getAliasesResponse.getAliases()) { @@ -630,8 +535,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements .setQuery(queryBuilder) .setSize(0) .setTrackTotalHits(true); - SearchResponse searchResponse = - searchRequestBuilder.execute().actionGet(); + SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); fields.put(path, searchResponse.getHits().getTotalHits().value); } } @@ -783,7 +687,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public String toString() { - return "NOTHING TO DO"; + return "EMPTY PRUNE"; } } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index bd020c3..dc6a323 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.xbib.elx.api.BasicClient; +import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,8 +59,6 @@ public abstract class AbstractBasicClient implements BasicClient { logger.log(Level.INFO, "initializing client with settings = " + settings.toDelimitedString(',')); this.settings = settings; setClient(createClient(settings)); - } else { - logger.log(Level.WARN, "not initializing client"); } } @@ -88,8 +87,11 @@ public abstract class AbstractBasicClient implements BasicClient { ensureClientIsPresent(); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() + .timeout(timeout) + .waitForStatus(status); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { String message = "timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name(); logger.error(message); @@ -100,8 +102,8 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); + logger.info("waiting for cluster shard settling"); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - logger.log(Level.DEBUG, "waiting " + timeout + " for shard settling down"); ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() .waitForNoInitializingShards(true) .waitForNoRelocatingShards(true) @@ -109,7 +111,7 @@ public abstract class AbstractBasicClient implements BasicClient { ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); if (healthResponse.isTimedOut()) { - String message = "timeout while waiting for cluster shards: " + timeout; + String message = "timeout waiting for cluster shards: " + timeout; logger.error(message); throw new IllegalStateException(message); } @@ -137,9 +139,9 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public long getSearchableDocs(String index) { + public long getSearchableDocs(IndexDefinition indexDefinition) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) - .setIndices(index) + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()) .setSize(0) .setTrackTotalHits(true); @@ -147,15 +149,14 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public boolean isIndexExists(String index) { + public boolean isIndexExists(IndexDefinition indexDefinition) { IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); - indicesExistsRequest.indices(index); + indicesExistsRequest.indices(indexDefinition.getFullIndexName()); IndicesExistsResponse indicesExistsResponse = client.execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); return indicesExistsResponse.isExists(); } - @Override public void close() throws IOException { ensureClientIsPresent(); @@ -192,6 +193,14 @@ public abstract class AbstractBasicClient implements BasicClient { } } + protected boolean ensureIndexDefinition(IndexDefinition indexDefinition) { + if (!indexDefinition.isEnabled()) { + logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled"); + return false; + } + return true; + } + protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) { switch (timeUnit) { case DAYS: diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 7719f9d..62893c8 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -79,78 +79,65 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void newIndex(IndexDefinition indexDefinition) throws IOException { - Settings settings = indexDefinition.getSettings() == null ? null : - Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build(); - Map mappings = indexDefinition.getMappings() == null ? null : - JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered(); - newIndex(indexDefinition.getFullIndexName(), settings, mappings); - } - - @Override - public void newIndex(String index) throws IOException { - newIndex(index, Settings.EMPTY, (Map) null); - } - - @Override - public void newIndex(String index, Settings settings) throws IOException { - newIndex(index, settings, (Map) null); - } - - @Override - public void newIndex(String index, Settings settings, Map mapping) throws IOException { - if (mapping == null || mapping.isEmpty()) { - newIndex(index, settings, (XContentBuilder) null); - } else { - newIndex(index, settings, JsonXContent.contentBuilder().map(mapping)); - } - } - - @Override - public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException { - if (index == null) { - logger.warn("unable to create index, no index name given"); + if (!ensureIndexDefinition(indexDefinition)) { return; } + String index = indexDefinition.getFullIndexName(); + if (index == null) { + throw new IllegalArgumentException("no index name given"); + } ensureClientIsPresent(); CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) .setIndex(index); - if (settings != null) { - createIndexRequestBuilder.setSettings(settings); + if (indexDefinition.getSettings() == null) { + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("index") + .field("number_of_shards", 1) + .field("number_of_replicas", 0) + .endObject() + .endObject(); + indexDefinition.setSettings(Strings.toString(builder)); } - if (builder != null) { - createIndexRequestBuilder.addMapping(TYPE_NAME, builder); - logger.debug("adding mapping = {}", Strings.toString(builder)); + Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build(); + createIndexRequestBuilder.setSettings(settings); + Map mappings = indexDefinition.getMappings() == null ? null : + JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered(); + if (mappings != null) { + createIndexRequestBuilder.addMapping(TYPE_NAME, mappings); } else { createIndexRequestBuilder.addMapping(TYPE_NAME, JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject()); - logger.debug("empty mapping"); } CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); if (createIndexResponse.isAcknowledged()) { logger.info("index {} created", index); } else { logger.warn("index creation of {} not acknowledged", index); + return; } - refreshIndex(index); + // we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly. + logger.info("waiting for GREEN after index {} was created", index); + waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); } @Override public void startBulk(IndexDefinition indexDefinition) throws IOException { - startBulk(indexDefinition.getFullIndexName(), -1, 1); - } - - @Override - public void startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) - throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return; + } if (bulkController != null) { ensureClientIsPresent(); - bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); + bulkController.startBulkMode(indexDefinition); } } @Override public void stopBulk(IndexDefinition indexDefinition) throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return; + } if (bulkController != null) { ensureClientIsPresent(); bulkController.stopBulkMode(indexDefinition); @@ -158,22 +145,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException { - if (bulkController != null) { - ensureClientIsPresent(); - bulkController.stopBulkMode(index, timeout, timeUnit); + public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source) { + return index(indexDefinition, id, create, + new BytesArray(source.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) { + if (!ensureIndexDefinition(indexDefinition)) { + return this; } - } - - @Override - public BulkClient index(String index, String id, boolean create, String source) { - return index(new IndexRequest().index(index).id(id).create(create) - .source(new BytesArray(source.getBytes(StandardCharsets.UTF_8)), XContentType.JSON)); - } - - @Override - public BulkClient index(String index, String id, boolean create, BytesReference source) { - return index(new IndexRequest().index(index).id(id).create(create) + return index(new IndexRequest().index(indexDefinition.getFullIndexName()).id(id).create(create) .source(source, XContentType.JSON)); } @@ -187,8 +169,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public BulkClient delete(String index, String id) { - return delete(new DeleteRequest().index(index).id(id)); + public BulkClient delete(IndexDefinition indexDefinition, String id) { + if (!ensureIndexDefinition(indexDefinition)) { + return this; + } + return delete(new DeleteRequest().index(indexDefinition.getFullIndexName()).id(id)); } @Override @@ -201,21 +186,25 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public BulkClient update(String index, String id, BytesReference source) { - return update(new UpdateRequest().index(index).id(id) + public BulkClient update(IndexDefinition indexDefinition, String id, String source) { + return update(indexDefinition, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) { + if (!ensureIndexDefinition(indexDefinition)) { + return this; + } + return update(new UpdateRequest().index(indexDefinition.getFullIndexName()).id(id) .doc(source, XContentType.JSON)); } - @Override - public BulkClient update(String index, String id, String source) { - return update(new UpdateRequest().index(index).id(id) - .doc(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON)); - } - @Override public BulkClient update(UpdateRequest updateRequest) { - ensureClientIsPresent(); - bulkController.bulkUpdate(updateRequest); + if (bulkController != null) { + ensureClientIsPresent(); + bulkController.bulkUpdate(updateRequest); + } return this; } @@ -231,18 +220,20 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void flushIndex(String index) { - if (index != null) { - ensureClientIsPresent(); - client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet(); + public void flushIndex(IndexDefinition indexDefinition) { + if (!ensureIndexDefinition(indexDefinition)) { + return; } + ensureClientIsPresent(); + client.execute(FlushAction.INSTANCE, new FlushRequest(indexDefinition.getFullIndexName())).actionGet(); } @Override - public void refreshIndex(String index) { - if (index != null) { - ensureClientIsPresent(); - client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet(); + public void refreshIndex(IndexDefinition indexDefinition) { + if (!ensureIndexDefinition(indexDefinition)) { + return; } + ensureClientIsPresent(); + client.execute(RefreshAction.INSTANCE, new RefreshRequest(indexDefinition.getFullIndexName())).actionGet(); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index 6816713..49133a3 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -39,7 +39,7 @@ public class ClientBuilder { } public ClientBuilder(ElasticsearchClient client) { - this(client, Thread.currentThread().getContextClassLoader()); + this(client, ClassLoader.getSystemClassLoader()); } public ClientBuilder(ElasticsearchClient client, ClassLoader classLoader) { @@ -47,6 +47,17 @@ public class ClientBuilder { this.classLoader = classLoader; this.settingsBuilder = Settings.builder(); settingsBuilder.put("node.name", "elx-client-" + Version.CURRENT); + for (Parameters p : Parameters.values()) { + if (p.getType() == Boolean.class) { + settingsBuilder.put(p.getName(), p.getBoolean()); + } + if (p.getType() == Integer.class) { + settingsBuilder.put(p.getName(), p.getInteger()); + } + if (p.getType() == String.class) { + settingsBuilder.put(p.getName(), p.getString()); + } + } } public static ClientBuilder builder() { diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index b63e1c6..58d3c69 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -16,10 +16,6 @@ import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,11 +31,7 @@ public class DefaultBulkController implements BulkController { private BulkProcessor bulkProcessor; - private final List indexNames; - - private final Map startBulkRefreshIntervals; - - private final Map stopBulkRefreshIntervals; + private BulkListener bulkListener; private final long maxWaitTime; @@ -50,10 +42,7 @@ public class DefaultBulkController implements BulkController { public DefaultBulkController(BulkClient bulkClient) { this.bulkClient = bulkClient; this.bulkMetric = new DefaultBulkMetric(); - this.indexNames = new ArrayList<>(); this.active = new AtomicBoolean(false); - this.startBulkRefreshIntervals = new HashMap<>(); - this.stopBulkRefreshIntervals = new HashMap<>(); this.maxWaitTime = 30L; this.maxWaitTimeUnit = TimeUnit.SECONDS; } @@ -71,31 +60,38 @@ public class DefaultBulkController implements BulkController { @Override public void init(Settings settings) { bulkMetric.init(settings); - int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), - Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); - int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), - Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum()); - TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), - TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum())); - ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), - ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), - "maxVolumePerRequest")); - boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), - Parameters.ENABLE_BULK_LOGGING.getValue()); - BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); + int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), + Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); + int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), + Parameters.MAX_CONCURRENT_REQUESTS.getInteger()); + String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), + Parameters.FLUSH_INTERVAL.getString()); + TimeValue flushIngestInterval = TimeValue.parseTimeValue(flushIngestIntervalStr, + TimeValue.timeValueSeconds(30), ""); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); + boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), + Parameters.ENABLE_BULK_LOGGING.getBoolean()); + boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), + Parameters.FAIL_ON_BULK_ERROR.getBoolean()); + this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging, failOnBulkError); this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) .setFlushInterval(flushIngestInterval) .setBulkSize(maxVolumePerRequest) .build(); + this.active.set(true); if (logger.isInfoEnabled()) { logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + - "flushIngestInterval = {} maxVolumePerRequest = {} bulk logging = {} logger debug = {} from settings = {}", - maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest, - enableBulkLogging, logger.isDebugEnabled(), settings.toDelimitedString(',')); + "flushIngestInterval = {} maxVolumePerRequest = {} " + + "bulk logging = {} fail on bulk error = {} " + + "logger debug = {} from settings = {}", + maxActionsPerRequest, maxConcurrentRequests, + flushIngestInterval, maxVolumePerRequest, + enableBulkLogging, failOnBulkError, + logger.isDebugEnabled(), settings.toDelimitedString(',')); } - this.active.set(true); } @Override @@ -105,22 +101,11 @@ public class DefaultBulkController implements BulkController { @Override public void startBulkMode(IndexDefinition indexDefinition) throws IOException { - startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(), - indexDefinition.getStopRefreshInterval()); - } - - @Override - public void startBulkMode(String indexName, - long startRefreshIntervalInSeconds, - long stopRefreshIntervalInSeconds) throws IOException { - if (!indexNames.contains(indexName)) { - indexNames.add(indexName); - startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds); - stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds); - if (startRefreshIntervalInSeconds != 0L) { - bulkClient.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s", - 30L, TimeUnit.SECONDS); - } + String indexName = indexDefinition.getFullIndexName(); + if (indexDefinition.getStartBulkRefreshSeconds() != 0) { + bulkClient.updateIndexSetting(indexName, "refresh_interval", + indexDefinition.getStartBulkRefreshSeconds() + "s", + 30L, TimeUnit.SECONDS); } } @@ -169,31 +154,30 @@ public class DefaultBulkController implements BulkController { @Override public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) { try { - return bulkProcessor.awaitFlush(timeout, timeUnit); + if (bulkProcessor != null) { + bulkProcessor.flush(); + return bulkProcessor.awaitFlush(timeout, timeUnit); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("interrupted"); return false; + } catch (IOException e) { + logger.error(e.getMessage(), e); + return false; } + return false; } @Override public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { - stopBulkMode(indexDefinition.getFullIndexName(), - indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); - } - - @Override - public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException { flush(); - if (waitForBulkResponses(timeout, timeUnit)) { - if (indexNames.contains(index)) { - Long secs = stopBulkRefreshIntervals.get(index); - if (secs != null && secs != 0L) { - bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s", - 30L, TimeUnit.SECONDS); - } - indexNames.remove(index); + if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { + if (indexDefinition.getStopBulkRefreshSeconds() != 0) { + bulkClient.updateIndexSetting(indexDefinition.getFullIndexName(), + "refresh_interval", + indexDefinition.getStopBulkRefreshSeconds() + "s", + 30L, TimeUnit.SECONDS); } } } @@ -209,15 +193,7 @@ public class DefaultBulkController implements BulkController { public void close() throws IOException { flush(); bulkMetric.close(); - if (bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit)) { - for (String index : indexNames) { - Long secs = stopBulkRefreshIntervals.get(index); - if (secs != null && secs != 0L) - bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s", - 30L, TimeUnit.SECONDS); - } - indexNames.clear(); - } + bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit); if (bulkProcessor != null) { bulkProcessor.close(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 572e717..2c191c8 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -19,14 +19,18 @@ public class DefaultBulkListener implements BulkListener { private final boolean isBulkLoggingEnabled; + private final boolean failOnError; + private Throwable lastBulkError; public DefaultBulkListener(BulkController bulkController, BulkMetric bulkMetric, - boolean isBulkLoggingEnabled) { + boolean isBulkLoggingEnabled, + boolean failOnError) { this.bulkController = bulkController; this.bulkMetric = bulkMetric; this.isBulkLoggingEnabled = isBulkLoggingEnabled; + this.failOnError = failOnError; } @Override @@ -74,6 +78,10 @@ public class DefaultBulkListener implements BulkListener { logger.error("bulk [{}] failed with {} failed items, failure message = {}", executionId, n, response.buildFailureMessage()); } + if (failOnError) { + throw new IllegalStateException("bulk failed: id = " + executionId + + " n = " + n + " message = " + response.buildFailureMessage()); + } } else { bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index bc7ccf7..e042904 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,51 +2,54 @@ package org.xbib.elx.common; public enum Parameters { - ENABLE_BULK_LOGGING(false), + MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30), - DEFAULT_MAX_ACTIONS_PER_REQUEST(1000), + START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0), - DEFAULT_MAX_CONCURRENT_REQUESTS(Runtime.getRuntime().availableProcessors()), + STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), - DEFAULT_MAX_VOLUME_PER_REQUEST("10mb"), + ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true), - DEFAULT_FLUSH_INTERVAL(30), + FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true), - MAX_ACTIONS_PER_REQUEST ("max_actions_per_request"), + MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000), - MAX_CONCURRENT_REQUESTS("max_concurrent_requests"), + // 0 = 1 CPU, synchronous requests, > 0 = n + 1 CPUs, asynchronous requests + MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1), - MAX_VOLUME_PER_REQUEST("max_volume_per_request"), + MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1mb"), - FLUSH_INTERVAL("flush_interval"); + FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"); - boolean value; + private final String name; - int num; + private final Class type; - String string; + private final Object value; - Parameters(boolean value) { + Parameters(String name, Class type, Object value) { + this.name = name; + this.type = type; this.value = value; } - Parameters(int num) { - this.num = num; + public String getName() { + return name; } - Parameters(String string) { - this.string = string; + public Class getType() { + return type; } - public boolean getValue() { - return value; + public Boolean getBoolean() { + return type == Boolean.class ? (Boolean) value : Boolean.FALSE; } - public int getNum() { - return num; + public Integer getInteger() { + return type == Integer.class ? (Integer) value : 0; } public String getString() { - return string; + return type == String.class ? (String) value : null; } }