diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index d02c2f2..0ac9d13 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -1,30 +1,17 @@ package org.xbib.elx.api; -import org.elasticsearch.common.settings.Settings; - import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; /** * Interface for extended managing and indexing methods of an Elasticsearch client. */ public interface AdminClient extends BasicClient { - /** - * Build index definition from settings. - * - * @param index the index name - * @param settings the settings for the index - * @return index definition - * @throws IOException if settings/mapping URL is invalid/malformed - */ - IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException; - - Map getMapping(String index) throws IOException; + Map getMapping(IndexDefinition indexDefinition) throws IOException; - void checkMapping(String index); + void checkMapping(IndexDefinition indexDefinition); /** * Delete an index. @@ -33,15 +20,6 @@ public interface AdminClient extends BasicClient { */ AdminClient deleteIndex(IndexDefinition indexDefinition); - /** - * Delete an index. - * - * @param index index - * @return this - */ - AdminClient deleteIndex(String index); - - /** * Update replica level. * @param indexDefinition the index definition @@ -51,18 +29,6 @@ public interface AdminClient extends BasicClient { */ AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException; - /** - * Update replica level. - * - * @param index index - * @param level the replica level - * @param maxWaitTime maximum wait time - * @param timeUnit time unit - * @return this - * @throws IOException if replica setting could not be updated - */ - AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException; - /** * Get replica level. * @param indexDefinition the index name @@ -70,13 +36,6 @@ public interface AdminClient extends BasicClient { */ int getReplicaLevel(IndexDefinition indexDefinition); - /** - * Get replica level. - * @param index the index name - * @return the replica level of the index - */ - int getReplicaLevel(String index); - /** * Force segment merge of an index. * @param indexDefinition the index definition @@ -84,15 +43,6 @@ public interface AdminClient extends BasicClient { */ boolean forceMerge(IndexDefinition indexDefinition); - /** - * Force segment merge of an index. - * @param index the index - * @param maxWaitTime maximum wait time - * @param timeUnit time unit - * @return this - */ - boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit); - /** * Resolve alias. * @@ -117,15 +67,6 @@ public interface AdminClient extends BasicClient { */ Map getAliases(String index); - /** - * Shift from one index to another. - * @param indexDefinition the index definition - * @param additionalAliases new aliases - * @return this - */ - IndexShiftResult shiftIndex(IndexDefinition indexDefinition, - List additionalAliases); - /** * Shift from one index to another. * @param indexDefinition the index definition @@ -148,10 +89,10 @@ public interface AdminClient extends BasicClient { /** * Find the timestamp of the most recently indexed document in the index. * - * @param index the index name + * @param indexDefinition the index * @param timestampfieldname the timestamp field name * @return millis UTC millis of the most recent document * @throws IOException if most rcent document can not be found */ - Long mostRecentDocument(String index, String timestampfieldname) throws IOException; + Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) throws IOException; } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index 9b9b4b5..7c96b35 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -56,7 +56,7 @@ public interface BasicClient extends Closeable { void waitForShards(long maxWaitTime, TimeUnit timeUnit); - long getSearchableDocs(String index); + long getSearchableDocs(IndexDefinition index); - boolean isIndexExists(String index); + boolean isIndexExists(IndexDefinition index); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 7368b46..1074ae6 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -23,31 +23,44 @@ public interface BulkClient extends BasicClient, Flushable { */ void newIndex(IndexDefinition indexDefinition) throws IOException; + /** + * Start bulk mode for indexes. + * @param indexDefinition index definition + * @throws IOException if bulk could not be started + */ + void startBulk(IndexDefinition indexDefinition) throws IOException; + + /** + * Stop bulk mode. + * + * @param indexDefinition index definition + * @throws IOException if bulk could not be startet + */ + void stopBulk(IndexDefinition indexDefinition) throws IOException; + /** * Add index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when limits are exceeded. * - * @param index the index - * @param type the type + * @param indexDefinition the index definition * @param id the id * @param create true if document must be created * @param source the source * @return this */ - BulkClient index(String index, String type, String id, boolean create, BytesReference source); + BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source); /** * Index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when limits are exceeded. * - * @param index the index - * @param type the type + * @param indexDefinition the index definition * @param id the id * @param create true if document is to be created, false otherwise * @param source the source * @return this client methods */ - BulkClient index(String index, String type, String id, boolean create, String source); + BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source); /** * Index request. Each request will be added to a queue for bulking requests. @@ -61,12 +74,11 @@ public interface BulkClient extends BasicClient, Flushable { /** * Delete request. * - * @param index the index - * @param type the type + * @param indexDefinition the index definition * @param id the id * @return this */ - BulkClient delete(String index, String type, String id); + BulkClient delete(IndexDefinition indexDefinition, String id); /** * Delete request. Each request will be added to a queue for bulking requests. @@ -82,24 +94,22 @@ public interface BulkClient extends BasicClient, Flushable { * Submitting request will be done when bulk limits are exceeded. * Note that updates only work correctly when all operations between nodes are synchronized. * - * @param index the index - * @param type the type + * @param indexDefinition the index definition * @param id the id * @param source the source * @return this */ - BulkClient update(String index, String type, String id, BytesReference source); + BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source); /** * Update document. Use with precaution! Does not work in all cases. * - * @param index the index - * @param type the type + * @param indexDefinition the index definition * @param id the id * @param source the source * @return this */ - BulkClient update(String index, String type, String id, String source); + BulkClient update(IndexDefinition indexDefinition, String id, String source); /** * Bulked update request. Each request will be added to a queue for bulking requests. @@ -111,42 +121,6 @@ public interface BulkClient extends BasicClient, Flushable { */ BulkClient update(UpdateRequest updateRequest); - /** - * Start bulk mode for indexes. - * @param indexDefinition index definition - * @throws IOException if bulk could not be started - */ - void startBulk(IndexDefinition indexDefinition) throws IOException; - - /** - * Start bulk mode. - * - * @param index index - * @param startRefreshIntervalSeconds refresh interval before bulk - * @param stopRefreshIntervalSeconds refresh interval after bulk - * @throws IOException if bulk could not be started - */ - void startBulk(String index, long startRefreshIntervalSeconds, - long stopRefreshIntervalSeconds) throws IOException; - - /** - * Stop bulk mode. - * - * @param indexDefinition index definition - * @throws IOException if bulk could not be startet - */ - void stopBulk(IndexDefinition indexDefinition) throws IOException; - - /** - * Stops bulk mode. - * - * @param index index - * @param timeout maximum wait time - * @param timeUnit time unit for timeout - * @throws IOException if bulk could not be stopped - */ - void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException; - /** * Wait for all outstanding bulk responses. * @@ -170,14 +144,14 @@ public interface BulkClient extends BasicClient, Flushable { /** * Refresh the index. * - * @param index index + * @param indexDefinition index definition */ - void refreshIndex(String index); + void refreshIndex(IndexDefinition indexDefinition); /** * Flush the index. The cluster clears cache and completes indexing. * - * @param index index + * @param indexDefinition index definition */ - void flushIndex(String index); + void flushIndex(IndexDefinition indexDefinition); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java index ae3e274..539f01b 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java @@ -22,9 +22,6 @@ public interface BulkController extends Closeable, Flushable { void startBulkMode(IndexDefinition indexDefinition) throws IOException; - void startBulkMode(String indexName, long startRefreshIntervalInSeconds, - long stopRefreshIntervalInSeconds) throws IOException; - void bulkIndex(IndexRequest indexRequest); void bulkDelete(DeleteRequest deleteRequest); @@ -35,6 +32,4 @@ public interface BulkController extends Closeable, Flushable { void stopBulkMode(IndexDefinition indexDefinition) throws IOException; - void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException; - } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index ebfea49..c207311 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -34,6 +34,14 @@ public interface IndexDefinition { Pattern getDateTimePattern(); + IndexDefinition setStartBulkRefreshSeconds(int seconds); + + int getStartBulkRefreshSeconds(); + + IndexDefinition setStopBulkRefreshSeconds(int seconds); + + int getStopBulkRefreshSeconds(); + IndexDefinition setEnabled(boolean enabled); boolean isEnabled(); @@ -67,12 +75,4 @@ public interface IndexDefinition { long getMaxWaitTime(); TimeUnit getMaxWaitTimeUnit(); - - IndexDefinition setStartRefreshInterval(long seconds); - - long getStartRefreshInterval(); - - IndexDefinition setStopRefreshInterval(long seconds); - - long getStopRefreshInterval(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 03fa256..7a35f9b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -41,11 +41,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.common.xcontent.yaml.YamlXContent; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -57,18 +52,10 @@ import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.IndexAliasAdder; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; -import org.xbib.elx.api.IndexRetention; import org.xbib.elx.api.IndexShiftResult; import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.charset.MalformedInputException; import java.nio.charset.StandardCharsets; -import java.time.LocalDate; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -94,30 +81,28 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); - /** - * The one and only index type name used in the extended client. - * Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_". - */ - private static final String TYPE_NAME = "doc"; - @Override - public Map getMapping(String index) throws IOException { + public Map getMapping(IndexDefinition indexDefinition) throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return null; + } GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) - .setIndices(index) - .setTypes(TYPE_NAME); + .setIndices(indexDefinition.getFullIndexName()) + .setTypes(indexDefinition.getType()); GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); - logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(TYPE_NAME).getSourceAsMap()); - return getMappingsResponse.getMappings().get(index).get(TYPE_NAME).getSourceAsMap(); + return getMappingsResponse.getMappings() + .get(indexDefinition.getFullIndexName()) + .get(indexDefinition.getType()) + .getSourceAsMap(); } @Override public AdminClient deleteIndex(IndexDefinition indexDefinition) { - return deleteIndex(indexDefinition.getFullIndexName()); - } - - @Override - public AdminClient deleteIndex(String index) { + if (!ensureIndexDefinition(indexDefinition)) { + return null; + } ensureClientIsPresent(); + String index = indexDefinition.getFullIndexName(); if (index == null) { logger.warn("no index name given to delete index"); return this; @@ -130,27 +115,27 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { - return updateReplicaLevel(indexDefinition.getFullIndexName(), level, - indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); - } - - @Override - public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return null; + } if (level < 1) { logger.warn("invalid replica level"); return this; } + String index = indexDefinition.getFullIndexName(); + long maxWaitTime = indexDefinition.getMaxWaitTime(); + TimeUnit timeUnit = indexDefinition.getMaxWaitTimeUnit(); updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); return this; } @Override public int getReplicaLevel(IndexDefinition indexDefinition) { - return getReplicaLevel(indexDefinition.getFullIndexName()); - } - - @Override - public int getReplicaLevel(String index) { + if (!ensureIndexDefinition(indexDefinition)) { + return -1; + } + ensureClientIsPresent(); + String index = indexDefinition.getFullIndexName(); GetSettingsRequest request = new GetSettingsRequest().indices(index); GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet(); int replica = -1; @@ -165,10 +150,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public String resolveMostRecentIndex(String alias) { - ensureClientIsPresent(); if (alias == null) { return null; } + ensureClientIsPresent(); GetAliasesRequest getAliasesRequest = new GetAliasesRequest().aliases(alias); GetAliasesResponse getAliasesResponse = client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); @@ -187,6 +172,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (index == null) { return Collections.emptyMap(); } + ensureClientIsPresent(); GetAliasesRequest getAliasesRequest = new GetAliasesRequest().indices(index); return getFilters(client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet()); } @@ -210,12 +196,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return aliasOrIndex != null ? aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).collect(Collectors.toList()) : Collections.emptyList(); } - @Override - public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, - List additionalAliases) { - return shiftIndex(indexDefinition, additionalAliases, null); - } - @Override public IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List additionalAliases, @@ -223,6 +203,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (additionalAliases == null) { return new EmptyIndexShiftResult(); } + if (!ensureIndexDefinition(indexDefinition)) { + return new EmptyIndexShiftResult(); + } if (indexDefinition.isShiftEnabled()) { return shiftIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), additionalAliases.stream() @@ -380,7 +363,10 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @Override - public Long mostRecentDocument(String index, String timestampfieldname) { + public Long mostRecentDocument(IndexDefinition indexDefinition, String timestampfieldname) { + if (!ensureIndexDefinition(indexDefinition)) { + return -1L; + } ensureClientIsPresent(); SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); SearchSourceBuilder builder = new SearchSourceBuilder(); @@ -388,7 +374,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements builder.field(timestampfieldname); builder.size(1); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(index); + searchRequest.indices(indexDefinition.getFullIndexName()); searchRequest.source(builder); SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); if (searchResponse.getHits().getHits().length == 1) { @@ -399,25 +385,25 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return 0L; } } - return null; + // almost impossible + return -1L; } @Override public boolean forceMerge(IndexDefinition indexDefinition) { - if (indexDefinition.isForceMergeEnabled()) { - return forceMerge(indexDefinition.getFullIndexName(), indexDefinition.getMaxWaitTime(), - indexDefinition.getMaxWaitTimeUnit()); + if (!ensureIndexDefinition(indexDefinition)) { + return false; } - return false; - } - - @Override - public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) { - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); + if (!indexDefinition.isForceMergeEnabled()) { + return false; + } + ensureClientIsPresent(); + String index = indexDefinition.getFullIndexName(); ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); forceMergeRequest.indices(index); try { - client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).get(timeout.getMillis(), TimeUnit.MILLISECONDS); + client.execute(ForceMergeAction.INSTANCE, forceMergeRequest) + .get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); return true; } catch (TimeoutException e) { logger.error("timeout"); @@ -430,45 +416,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return false; } - @Override - public IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) - throws IOException { - boolean isEnabled = settings.getAsBoolean("enabled", false); - String indexName = settings.get("name", index); - String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); - Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); - String dateTimeFormat = settings.get("dateTimeFormat", "yyyyMMdd"); - DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat) - .withZone(ZoneId.systemDefault()); - String fullName = indexName + dateTimeFormatter.format(LocalDate.now()); - String fullIndexName = resolveAlias(fullName).stream().findFirst().orElse(fullName); - IndexRetention indexRetention = new DefaultIndexRetention() - .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) - .setDelta(settings.getAsInt("retention.delta", 0)); - return new DefaultIndexDefinition() - .setEnabled(isEnabled) - .setIndex(indexName) - .setFullIndexName(fullIndexName) - .setSettings(findSettingsFrom(settings.get("settings"))) - .setMappings(findMappingsFrom(settings.get("mapping"))) - .setDateTimeFormatter(dateTimeFormatter) - .setDateTimePattern(dateTimePattern) - .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) - .setShift(settings.getAsBoolean("shift", true)) - .setPrune(settings.getAsBoolean("prune", true)) - .setReplicaLevel(settings.getAsInt("replica", 0)) - .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) - .setRetention(indexRetention) - .setStartRefreshInterval(settings.getAsLong("bulk.startrefreshinterval", -1L)) - .setStopRefreshInterval(settings.getAsLong("bulk.stoprefreshinterval", -1L)); - } - @Override public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { - ensureClientIsPresent(); if (index == null) { throw new IOException("no index name given"); } + ensureClientIsPresent(); Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) @@ -477,9 +430,12 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } @Override - public void checkMapping(String index) { + public void checkMapping(IndexDefinition indexDefinition) { + if (!ensureIndexDefinition(indexDefinition)) { + return; + } ensureClientIsPresent(); - GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(indexDefinition.getFullIndexName()); GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); ImmutableOpenMap> map = getMappingsResponse.getMappings(); map.keys().forEach((Consumer>) stringObjectCursor -> { @@ -487,54 +443,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements for (ObjectObjectCursor cursor : mappings) { String mappingName = cursor.key; MappingMetaData mappingMetaData = cursor.value; - checkMapping(index, mappingName, mappingMetaData); + checkMapping(indexDefinition.getFullIndexName(), mappingName, mappingMetaData); } }); } - private static String findSettingsFrom(String string) throws IOException { - if (string == null) { - return null; - } - try { - URL url = new URL(string); - try (InputStream inputStream = url.openStream()) { - Settings settings = Settings.builder().loadFromStream(string, inputStream).build(); - XContentBuilder builder = JsonXContent.contentBuilder(); - settings.toXContent(builder, ToXContent.EMPTY_PARAMS); - return builder.string(); - } - } catch (MalformedURLException e) { - return string; - } - } - - private static String findMappingsFrom(String string) throws IOException { - if (string == null) { - return null; - } - try { - URL url = new URL(string); - try (InputStream inputStream = url.openStream()) { - if (string.endsWith(".json")) { - Map mappings = JsonXContent.jsonXContent.createParser(inputStream).mapOrdered(); - XContentBuilder builder = JsonXContent.contentBuilder(); - builder.startObject().map(mappings).endObject(); - return builder.string(); - } - if (string.endsWith(".yml") || string.endsWith(".yaml")) { - Map mappings = YamlXContent.yamlXContent.createParser(inputStream).mapOrdered(); - XContentBuilder builder = JsonXContent.contentBuilder(); - builder.startObject().map(mappings).endObject(); - return builder.string(); - } - } - return string; - } catch (MalformedInputException e) { - return string; - } - } - private Map getFilters(GetAliasesResponse getAliasesResponse) { Map result = new HashMap<>(); for (ObjectObjectCursor> object : getAliasesResponse.getAliases()) { @@ -654,7 +567,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } - private static class EmptyIndexShiftResult implements IndexShiftResult { @Override @@ -774,5 +686,4 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return "EMPTY PRUNE"; } } - } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index 0a6127e..422356d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.xbib.elx.api.BasicClient; +import org.xbib.elx.api.IndexDefinition; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,12 +33,6 @@ public abstract class AbstractBasicClient implements BasicClient { private static final Logger logger = LogManager.getLogger(AbstractBasicClient.class.getName()); - /** - * The one and only index type name used in the extended client. - * Note that all Elasticsearch versions before 6.2.0 do not allow a prepending "_". - */ - protected static final String TYPE_NAME = "doc"; - protected ElasticsearchClient client; protected Settings settings; @@ -144,18 +139,23 @@ public abstract class AbstractBasicClient implements BasicClient { } @Override - public long getSearchableDocs(String index) { + public long getSearchableDocs(IndexDefinition indexDefinition) { + if (!ensureIndexDefinition(indexDefinition)) { + return -1L; + } + ensureClientIsPresent(); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) - .setIndices(index) + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()) .setSize(0); return searchRequestBuilder.execute().actionGet().getHits().getTotalHits(); } @Override - public boolean isIndexExists(String index) { + public boolean isIndexExists(IndexDefinition indexDefinition) { + ensureClientIsPresent(); IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); - indicesExistsRequest.indices(new String[]{index}); + indicesExistsRequest.indices(new String[] { indexDefinition.getFullIndexName() } ); IndicesExistsResponse indicesExistsResponse = client.execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); return indicesExistsResponse.isExists(); @@ -198,6 +198,14 @@ public abstract class AbstractBasicClient implements BasicClient { } } + protected boolean ensureIndexDefinition(IndexDefinition indexDefinition) { + if (!indexDefinition.isEnabled()) { + logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled"); + return false; + } + return true; + } + protected static TimeValue toTimeValue(long timeValue, TimeUnit timeUnit) { switch (timeUnit) { case DAYS: diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 33ca051..9b5c0a7 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -1,5 +1,6 @@ package org.xbib.elx.common; +import com.google.common.base.Charsets; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -16,6 +17,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkController; @@ -70,6 +72,9 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public void newIndex(IndexDefinition indexDefinition) throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return; + } String index = indexDefinition.getFullIndexName(); if (index == null) { throw new IllegalArgumentException("no index name given"); @@ -81,16 +86,22 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements ensureClientIsPresent(); CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE) .setIndex(index); - Settings settings = indexDefinition.getSettings() == null ? null : - Settings.builder().loadFromSource(indexDefinition.getSettings()).build(); - if (settings != null) { - createIndexRequestBuilder.setSettings(settings); + if (indexDefinition.getSettings() == null) { + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("index") + .field("number_of_shards", 1) + .field("number_of_replicas", 0) + .endObject() + .endObject(); + indexDefinition.setSettings(builder.string()); } + Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings()).build(); + createIndexRequestBuilder.setSettings(settings); // must be Map to match prototype of addMapping()! Map mappings = indexDefinition.getMappings() == null ? null : JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); if (mappings != null) { - logger.info("mappings = " + mappings); createIndexRequestBuilder.addMapping(type, mappings); } else { createIndexRequestBuilder.addMapping(type, @@ -101,52 +112,45 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements logger.info("index {} created", index); } else { logger.warn("index creation of {} not acknowledged", index); + return; } - refreshIndex(index); + // we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly. + logger.info("waiting for GREEN after index {} was created", index); + waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); } @Override public void startBulk(IndexDefinition indexDefinition) throws IOException { - startBulk(indexDefinition.getFullIndexName(), -1, 1); - } - - @Override - public void startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) - throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return; + } ensureClientIsPresent(); - bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); + bulkController.startBulkMode(indexDefinition); } @Override public void stopBulk(IndexDefinition indexDefinition) throws IOException { + if (!ensureIndexDefinition(indexDefinition)) { + return; + } ensureClientIsPresent(); bulkController.stopBulkMode(indexDefinition); } @Override - public void stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException { - ensureClientIsPresent(); - bulkController.stopBulkMode(index, timeout, timeUnit); - } - - @Override - public BulkClient index(String index, String type, String id, boolean create, String source) { - return index(new IndexRequest() - .index(index) - .type(type) - .id(id) - .create(create) - .source(source)); // will be converted into a bytes reference + public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source) { + return index(indexDefinition, id, create, new BytesArray(source.getBytes(Charsets.UTF_8))); } @Override - public BulkClient index(String index, String type, String id, boolean create, BytesReference source) { + public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, BytesReference source) { + if (!ensureIndexDefinition(indexDefinition)) { + return this; + } return index(new IndexRequest() - .index(index) - .type(type) - .id(id) - .create(create) - .source(source)); + .index(indexDefinition.getFullIndexName()) + .type(indexDefinition.getType()) + .id(id).create(create).source(source)); } @Override @@ -157,10 +161,13 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public BulkClient delete(String index, String type, String id) { + public BulkClient delete(IndexDefinition indexDefinition, String id) { + if (!ensureIndexDefinition(indexDefinition)) { + return this; + } return delete(new DeleteRequest() - .index(index) - .type(type) + .index(indexDefinition.getFullIndexName()) + .type(indexDefinition.getType()) .id(id)); } @@ -172,17 +179,19 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public BulkClient update(String index, String type, String id, String source) { - return update(index, type, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); + public BulkClient update(IndexDefinition indexDefinition, String id, String source) { + return update(indexDefinition, id, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); } @Override - public BulkClient update(String index, String type, String id, BytesReference source) { + public BulkClient update(IndexDefinition indexDefinition, String id, BytesReference source) { + if (!ensureIndexDefinition(indexDefinition)) { + return this; + } return update(new UpdateRequest() - .index(index) - .type(type) - .id(id) - .doc(source.hasArray() ? source.array() : source.toBytes())); + .index(indexDefinition.getFullIndexName()) + .type(indexDefinition.getType()) + .id(id).doc(source.hasArray() ? source.array() : source.toBytes())); } @Override @@ -204,18 +213,20 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public void flushIndex(String index) { - if (index != null) { - ensureClientIsPresent(); - client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet(); + public void flushIndex(IndexDefinition indexDefinition) { + if (!ensureIndexDefinition(indexDefinition)) { + return; } + ensureClientIsPresent(); + client.execute(FlushAction.INSTANCE, new FlushRequest(indexDefinition.getFullIndexName())).actionGet(); } @Override - public void refreshIndex(String index) { - if (index != null) { - ensureClientIsPresent(); - client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet(); + public void refreshIndex(IndexDefinition indexDefinition) { + if (!ensureIndexDefinition(indexDefinition)) { + return; } + ensureClientIsPresent(); + client.execute(RefreshAction.INSTANCE, new RefreshRequest(indexDefinition.getFullIndexName())).actionGet(); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index 6816713..d672335 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -47,6 +47,17 @@ public class ClientBuilder { this.classLoader = classLoader; this.settingsBuilder = Settings.builder(); settingsBuilder.put("node.name", "elx-client-" + Version.CURRENT); + for (Parameters p : Parameters.values()) { + if (p.getType() == Boolean.class) { + settingsBuilder.put(p.getName(), p.getBoolean()); + } + if (p.getType() == Integer.class) { + settingsBuilder.put(p.getName(), p.getInteger()); + } + if (p.getType() == String.class) { + settingsBuilder.put(p.getName(), p.getString()); + } + } } public static ClientBuilder builder() { @@ -82,6 +93,11 @@ public class ClientBuilder { return this; } + public ClientBuilder put(String key, Boolean value) { + settingsBuilder.put(key, value); + return this; + } + public ClientBuilder put(String key, Long value) { settingsBuilder.put(key, value); return this; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index 131e1d4..d487ba5 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -16,10 +16,6 @@ import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,11 +29,7 @@ public class DefaultBulkController implements BulkController { private BulkProcessor bulkProcessor; - private final List indexNames; - - private final Map startBulkRefreshIntervals; - - private final Map stopBulkRefreshIntervals; + private BulkListener bulkListener; private final long maxWaitTime; @@ -48,10 +40,7 @@ public class DefaultBulkController implements BulkController { public DefaultBulkController(BulkClient bulkClient) { this.bulkClient = bulkClient; this.bulkMetric = new DefaultBulkMetric(); - this.indexNames = new ArrayList<>(); this.active = new AtomicBoolean(false); - this.startBulkRefreshIntervals = new HashMap<>(); - this.stopBulkRefreshIntervals = new HashMap<>(); this.maxWaitTime = 30L; this.maxWaitTimeUnit = TimeUnit.SECONDS; } @@ -63,24 +52,27 @@ public class DefaultBulkController implements BulkController { @Override public Throwable getLastBulkError() { - return bulkProcessor.getBulkListener().getLastBulkError(); + return bulkListener.getLastBulkError(); } @Override public void init(Settings settings) { bulkMetric.init(settings); - int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.name(), - Parameters.DEFAULT_MAX_ACTIONS_PER_REQUEST.getNum()); - int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.name(), - Parameters.DEFAULT_MAX_CONCURRENT_REQUESTS.getNum()); - TimeValue flushIngestInterval = settings.getAsTime(Parameters.FLUSH_INTERVAL.name(), - TimeValue.timeValueSeconds(Parameters.DEFAULT_FLUSH_INTERVAL.getNum())); - ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), - ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), - "maxVolumePerRequest")); - boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), - Parameters.ENABLE_BULK_LOGGING.getValue()); - BulkListener bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); + int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), + Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); + int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), + Parameters.MAX_CONCURRENT_REQUESTS.getInteger()); + String flushIngestIntervalStr = settings.get(Parameters.FLUSH_INTERVAL.getName(), + Parameters.FLUSH_INTERVAL.getString()); + TimeValue flushIngestInterval = TimeValue.parseTimeValue(flushIngestIntervalStr, + TimeValue.timeValueSeconds(30), ""); + ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.getName(), + ByteSizeValue.parseBytesSizeValue(Parameters.MAX_VOLUME_PER_REQUEST.getString(), "1m")); + boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.getName(), + Parameters.ENABLE_BULK_LOGGING.getBoolean()); + boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), + Parameters.FAIL_ON_BULK_ERROR.getBoolean()); + this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging, failOnBulkError); this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) @@ -90,9 +82,13 @@ public class DefaultBulkController implements BulkController { this.active.set(true); if (logger.isInfoEnabled()) { logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + - "flushIngestInterval = {} maxVolumePerRequest = {} bulk logging = {} logger debug = {} from settings = {}", - maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest, - enableBulkLogging, logger.isDebugEnabled(), settings.toDelimitedString(',')); + "flushIngestInterval = {} maxVolumePerRequest = {} " + + "bulk logging = {} fail on bulk error = {} " + + "logger debug = {} from settings = {}", + maxActionsPerRequest, maxConcurrentRequests, + flushIngestInterval, maxVolumePerRequest, + enableBulkLogging, failOnBulkError, + logger.isDebugEnabled(), settings.toDelimitedString(',')); } } @@ -103,22 +99,11 @@ public class DefaultBulkController implements BulkController { @Override public void startBulkMode(IndexDefinition indexDefinition) throws IOException { - startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(), - indexDefinition.getStopRefreshInterval()); - } - - @Override - public void startBulkMode(String indexName, - long startRefreshIntervalInSeconds, - long stopRefreshIntervalInSeconds) throws IOException { - if (!indexNames.contains(indexName)) { - indexNames.add(indexName); - startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds); - stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds); - if (startRefreshIntervalInSeconds != 0L) { - bulkClient.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s", - 30L, TimeUnit.SECONDS); - } + String indexName = indexDefinition.getFullIndexName(); + if (indexDefinition.getStartBulkRefreshSeconds() != 0) { + bulkClient.updateIndexSetting(indexName, "refresh_interval", + indexDefinition.getStartBulkRefreshSeconds() + "s", + 30L, TimeUnit.SECONDS); } } @@ -167,31 +152,30 @@ public class DefaultBulkController implements BulkController { @Override public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) { try { - return bulkProcessor.awaitFlush(timeout, timeUnit); + if (bulkProcessor != null) { + bulkProcessor.flush(); + return bulkProcessor.awaitFlush(timeout, timeUnit); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("interrupted"); return false; + } catch (IOException e) { + logger.error(e.getMessage(), e); + return false; } + return false; } @Override public void stopBulkMode(IndexDefinition indexDefinition) throws IOException { - stopBulkMode(indexDefinition.getFullIndexName(), - indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); - } - - @Override - public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException { flush(); - if (waitForBulkResponses(timeout, timeUnit)) { - if (indexNames.contains(index)) { - Long secs = stopBulkRefreshIntervals.get(index); - if (secs != null && secs != 0L) { - bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s", - 30L, TimeUnit.SECONDS); - } - indexNames.remove(index); + if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { + if (indexDefinition.getStopBulkRefreshSeconds() != 0) { + bulkClient.updateIndexSetting(indexDefinition.getFullIndexName(), + "refresh_interval", + indexDefinition.getStopBulkRefreshSeconds() + "s", + 30L, TimeUnit.SECONDS); } } } @@ -206,15 +190,7 @@ public class DefaultBulkController implements BulkController { @Override public void close() throws IOException { flush(); - if (bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit)) { - for (String index : indexNames) { - Long secs = stopBulkRefreshIntervals.get(index); - if (secs != null && secs != 0L) - bulkClient.updateIndexSetting(index, "refresh_interval", secs + "s", - 30L, TimeUnit.SECONDS); - } - indexNames.clear(); - } + bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit); if (bulkProcessor != null) { bulkProcessor.close(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index db0ea42..2d25d8f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -19,14 +19,18 @@ public class DefaultBulkListener implements BulkListener { private final boolean isBulkLoggingEnabled; + private final boolean failOnError; + private Throwable lastBulkError = null; public DefaultBulkListener(BulkController bulkController, BulkMetric bulkMetric, - boolean isBulkLoggingEnabled) { + boolean isBulkLoggingEnabled, + boolean failOnError) { this.bulkController = bulkController; this.bulkMetric = bulkMetric; this.isBulkLoggingEnabled = isBulkLoggingEnabled; + this.failOnError = failOnError; } @Override @@ -73,6 +77,10 @@ public class DefaultBulkListener implements BulkListener { logger.error("bulk [{}] failed with {} failed items, failure message = {}", executionId, n, response.buildFailureMessage()); } + if (failOnError) { + throw new IllegalStateException("bulk failed: id = " + executionId + + " n = " + n + " message = " + response.buildFailureMessage()); + } } else { bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index 27065ec..8bbd250 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -1,10 +1,24 @@ package org.xbib.elx.common; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.yaml.YamlXContent; +import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexRetention; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.MalformedInputException; +import java.time.LocalDate; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -42,13 +56,50 @@ public class DefaultIndexDefinition implements IndexDefinition { private TimeUnit maxWaitTimeUnit; - private long startRefreshInterval; + private int startRefreshInterval; - private long stopRefreshInterval; + private int stopRefreshInterval; - public DefaultIndexDefinition() { + public DefaultIndexDefinition(String index, String type) { + setIndex(index); + setType(type); setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); + setFullIndexName(index + getDateTimeFormatter().format(LocalDate.now())); + setEnabled(true); + setMaxWaitTime(Parameters.MAX_WAIT_BULK_RESPONSE_SECONDS.getInteger(), TimeUnit.SECONDS); + } + + public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) + throws IOException { + boolean isEnabled = settings.getAsBoolean("enabled", true); + String indexName = settings.get("name", index); + String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); + Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); + String dateTimeFormat = settings.get("dateTimeFormat", "yyyyMMdd"); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat) + .withZone(ZoneId.systemDefault()); + String fullName = indexName + dateTimeFormatter.format(LocalDate.now()); + String fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); + IndexRetention indexRetention = new DefaultIndexRetention() + .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) + .setDelta(settings.getAsInt("retention.delta", 0)); + setEnabled(isEnabled) + .setIndex(indexName) + .setType(type) + .setFullIndexName(fullIndexName) + .setSettings(findSettingsFrom(settings.get("settings"))) + .setMappings(findMappingsFrom(settings.get("mapping"))) + .setDateTimeFormatter(dateTimeFormatter) + .setDateTimePattern(dateTimePattern) + .setIgnoreErrors(settings.getAsBoolean("skiperrors", false)) + .setShift(settings.getAsBoolean("shift", true)) + .setPrune(settings.getAsBoolean("prune", true)) + .setReplicaLevel(settings.getAsInt("replica", 0)) + .setMaxWaitTime(settings.getAsLong("timeout", 30L), TimeUnit.SECONDS) + .setRetention(indexRetention) + .setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1)) + .setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1)); } @Override @@ -129,6 +180,28 @@ public class DefaultIndexDefinition implements IndexDefinition { return pattern; } + @Override + public IndexDefinition setStartBulkRefreshSeconds(int seconds) { + this.startRefreshInterval = seconds; + return this; + } + + @Override + public int getStartBulkRefreshSeconds() { + return startRefreshInterval; + } + + @Override + public IndexDefinition setStopBulkRefreshSeconds(int seconds) { + this.stopRefreshInterval = seconds; + return this; + } + + @Override + public int getStopBulkRefreshSeconds() { + return stopRefreshInterval; + } + @Override public IndexDefinition setEnabled(boolean enabled) { this.enabled = enabled; @@ -223,26 +296,47 @@ public class DefaultIndexDefinition implements IndexDefinition { return maxWaitTimeUnit; } - @Override - public IndexDefinition setStartRefreshInterval(long seconds) { - this.startRefreshInterval = seconds; - return this; - } - @Override - public long getStartRefreshInterval() { - return startRefreshInterval; + private static String findSettingsFrom(String string) throws IOException { + if (string == null) { + return null; + } + try { + URL url = new URL(string); + try (InputStream inputStream = url.openStream()) { + Settings settings = Settings.builder().loadFromStream(string, inputStream).build(); + XContentBuilder builder = JsonXContent.contentBuilder(); + settings.toXContent(builder, ToXContent.EMPTY_PARAMS); + return builder.string(); + } + } catch (MalformedURLException e) { + return string; + } + } + + private static String findMappingsFrom(String string) throws IOException { + if (string == null) { + return null; + } + try { + URL url = new URL(string); + try (InputStream inputStream = url.openStream()) { + if (string.endsWith(".json")) { + Map mappings = JsonXContent.jsonXContent.createParser(inputStream).mapOrdered(); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject().map(mappings).endObject(); + return builder.string(); + } + if (string.endsWith(".yml") || string.endsWith(".yaml")) { + Map mappings = YamlXContent.yamlXContent.createParser(inputStream).mapOrdered(); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject().map(mappings).endObject(); + return builder.string(); + } + } + return string; + } catch (MalformedInputException e) { + return string; + } } - - @Override - public IndexDefinition setStopRefreshInterval(long seconds) { - this.stopRefreshInterval = seconds; - return this; - } - - @Override - public long getStopRefreshInterval() { - return stopRefreshInterval; - } - } diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index ebc1450..6e25fef 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -28,28 +28,12 @@ public class MockAdminClient extends AbstractAdminClient { protected void closeClient(Settings settings) { } - @Override - public MockAdminClient deleteIndex(String index) { - return this; - } - - @Override - public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) { - return true; - } - @Override public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) { } @Override public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { - - } - - @Override - public MockAdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) { - return this; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index a2f3f46..e0e861b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -39,21 +39,6 @@ public class MockBulkClient extends AbstractBulkClient { protected void closeClient(Settings settings) { } - @Override - public MockBulkClient index(String index, String type, String id, boolean create, String source) { - return this; - } - - @Override - public MockBulkClient delete(String index, String type, String id) { - return this; - } - - @Override - public MockBulkClient update(String index, String type, String id, String source) { - return this; - } - @Override public MockBulkClient index(IndexRequest indexRequest) { return this; @@ -69,28 +54,11 @@ public class MockBulkClient extends AbstractBulkClient { return this; } - @Override - public void startBulk(String index, long startRefreshInterval, long stopRefreshIterval) { - } - - @Override - public void stopBulk(String index, long maxWaitTime, TimeUnit timeUnit) { - } - @Override public boolean waitForResponses(long maxWaitTime, TimeUnit timeUnit) { return true; } - @Override - public void refreshIndex(String index) { - } - - @Override - public void flushIndex(String index) { - } - - @Override public void flush() { // nothing to do diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index 73819e1..e042904 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,51 +2,54 @@ package org.xbib.elx.common; public enum Parameters { - ENABLE_BULK_LOGGING(false), + MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30), - DEFAULT_MAX_ACTIONS_PER_REQUEST(1000), + START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0), - DEFAULT_MAX_CONCURRENT_REQUESTS(Runtime.getRuntime().availableProcessors()), + STOP_BULK_REFRESH_SECONDS("bulk.stop_refresh_seconds", Integer.class, 30), - DEFAULT_MAX_VOLUME_PER_REQUEST("10mb"), + ENABLE_BULK_LOGGING("bulk.logging.enabled", Boolean.class, true), - DEFAULT_FLUSH_INTERVAL(30), + FAIL_ON_BULK_ERROR("bulk.failonerror", Boolean.class, true), - MAX_ACTIONS_PER_REQUEST ("max_actions_per_request"), + MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000), - MAX_CONCURRENT_REQUESTS("max_concurrent_requests"), + // 0 = 1 CPU, synchronous requests, > 0 = n + 1 CPUs, asynchronous requests + MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1), - MAX_VOLUME_PER_REQUEST("max_volume_per_request"), + MAX_VOLUME_PER_REQUEST("bulk.max_volume_per_request", String.class, "1mb"), - FLUSH_INTERVAL("flush_interval"); + FLUSH_INTERVAL("bulk.flush_interval", String.class, "30s"); - boolean flag; + private final String name; - int num; + private final Class type; - String string; + private final Object value; - Parameters(boolean flag) { - this.flag = flag; + Parameters(String name, Class type, Object value) { + this.name = name; + this.type = type; + this.value = value; } - Parameters(int num) { - this.num = num; + public String getName() { + return name; } - Parameters(String string) { - this.string = string; + public Class getType() { + return type; } - boolean getValue() { - return flag; + public Boolean getBoolean() { + return type == Boolean.class ? (Boolean) value : Boolean.FALSE; } - int getNum() { - return num; + public Integer getInteger() { + return type == Integer.class ? (Integer) value : 0; } - String getString() { - return string; + public String getString() { + return type == String.class ? (String) value : null; } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/io/ClasspathURLStreamHandler.java b/elx-common/src/main/java/org/xbib/elx/common/io/ClasspathURLStreamHandler.java deleted file mode 100644 index e7d8727..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/io/ClasspathURLStreamHandler.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.xbib.elx.common.io; - -import java.io.IOException; -import java.net.URL; -import java.net.URLConnection; -import java.net.URLStreamHandler; - -public class ClasspathURLStreamHandler extends URLStreamHandler { - - private final ClassLoader classLoader; - - public ClasspathURLStreamHandler() { - this.classLoader = getClass().getClassLoader(); - } - - public ClasspathURLStreamHandler(ClassLoader classLoader) { - this.classLoader = classLoader; - } - - @Override - protected URLConnection openConnection(URL u) throws IOException { - final URL resourceUrl = classLoader.getResource(u.getPath()); - return resourceUrl != null ? resourceUrl.openConnection() : null; - } -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/io/ClasspathURLStreamHandlerFactory.java b/elx-common/src/main/java/org/xbib/elx/common/io/ClasspathURLStreamHandlerFactory.java deleted file mode 100644 index 00c7c83..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/io/ClasspathURLStreamHandlerFactory.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.xbib.elx.common.io; - -import java.net.URLStreamHandler; -import java.net.URLStreamHandlerFactory; - -public class ClasspathURLStreamHandlerFactory implements URLStreamHandlerFactory { - - @Override - public URLStreamHandler createURLStreamHandler(String protocol) { - return "classpath".equals(protocol) ? new ClasspathURLStreamHandler() : null; - } -} diff --git a/elx-common/src/main/java/org/xbib/elx/common/io/package-info.java b/elx-common/src/main/java/org/xbib/elx/common/io/package-info.java deleted file mode 100644 index 5e86ba1..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/io/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * I/O helpers for Elasticsearch client extensions. - */ -package org.xbib.elx.common.io; diff --git a/elx-common/src/main/java/org/xbib/elx/common/package-info.java b/elx-common/src/main/java/org/xbib/elx/common/package-info.java deleted file mode 100644 index 4971f08..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Common classes for Elasticsearch client extensions. - */ -package org.xbib.elx.common; diff --git a/elx-common/src/main/java/org/xbib/elx/common/util/package-info.java b/elx-common/src/main/java/org/xbib/elx/common/util/package-info.java deleted file mode 100644 index cd393c9..0000000 --- a/elx-common/src/main/java/org/xbib/elx/common/util/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Utilities for Elasticsearch client extensions. - */ -package org.xbib.elx.common.util; diff --git a/elx-common/src/main/resources/META-INF/services/java.net.URLStreamHandlerFactory b/elx-common/src/main/resources/META-INF/services/java.net.URLStreamHandlerFactory deleted file mode 100644 index bb6d620..0000000 --- a/elx-common/src/main/resources/META-INF/services/java.net.URLStreamHandlerFactory +++ /dev/null @@ -1 +0,0 @@ -org.xbib.elx.common.io.ClasspathURLStreamHandlerFactory \ No newline at end of file diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java b/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java deleted file mode 100644 index 9d006c1..0000000 --- a/elx-common/src/test/java/org/xbib/elx/common/test/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * - */ -package org.xbib.elx.common.test; diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index b63e72f..5cd6e40 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -2,7 +2,6 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; @@ -45,14 +44,12 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); - bulkClient.index("test", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); @@ -68,11 +65,9 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) + .put(Parameters.FLUSH_INTERVAL.getName(), "5s") .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); } } @@ -87,6 +82,7 @@ class BulkClientTest { .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); XContentBuilder builder = JsonXContent.contentBuilder() .startObject() .startObject("doc") @@ -97,13 +93,9 @@ class BulkClientTest { .endObject() .endObject() .endObject(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test"); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); indexDefinition.setMappings(builder.string()); bulkClient.newIndex(indexDefinition); - assertTrue(adminClient.getMapping("test").containsKey("properties")); + assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); } } @@ -113,26 +105,25 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", "doc", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); + bulkClient.stopBulk(indexDefinition); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } } @@ -145,29 +136,20 @@ class BulkClientTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) + .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) - .endObject() - .endObject(); - indexDefinition.setSettings(builder.string()); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setStartBulkRefreshSeconds(0); bulkClient.newIndex(indexDefinition); - bulkClient.startBulk("test", 0, 1000); + bulkClient.startBulk(indexDefinition); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { executorService.execute(() -> { for (int i1 = 0; i1 < actions; i1++) { - bulkClient.index("test", "doc", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } latch.countDown(); @@ -185,10 +167,10 @@ class BulkClientTest { } else { logger.warn("latch timeout"); } - bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index 4fd3ee7..fb0e255 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -38,20 +38,18 @@ class DuplicateIDTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", "doc", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); + bulkClient.refreshIndex(indexDefinition); + assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 3d7ac19..b4ccc54 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -2,8 +2,6 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; @@ -47,33 +45,23 @@ class IndexPruneTest { .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) - .endObject() - .endObject(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test_prune"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test_prune", "doc"); indexDefinition.setFullIndexName("test_prune1"); - indexDefinition.setSettings(builder.string()); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune2"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune3"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune4"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); IndexRetention indexRetention = new DefaultIndexRetention(); indexRetention.setDelta(2); indexRetention.setMinToKeep(2); @@ -90,7 +78,9 @@ class IndexPruneTest { assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - list.add(adminClient.isIndexExists(index)); + IndexDefinition indexDefinition1 = new DefaultIndexDefinition(index, null); + indexDefinition1.setFullIndexName(index); + list.add(adminClient.isIndexExists(indexDefinition1)); } logger.info(list); assertFalse(list.get(0)); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index b2e642f..a5dd7c4 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -4,8 +4,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.cluster.metadata.AliasAction; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -46,27 +44,18 @@ class IndexShiftTest { .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) - .endObject() - .endObject(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); - indexDefinition.setSettings(builder.string()); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift", "doc", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); indexDefinition.setShift(true); - IndexShiftResult indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c")); + IndexShiftResult indexShiftResult = + adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null); assertTrue(indexShiftResult.getNewAliases().contains("a")); assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); @@ -77,7 +66,8 @@ class IndexShiftTest { assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("test")); Optional resolved = adminClient.resolveAlias("test").stream().findFirst(); - aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); + aliases = resolved.isPresent() ? + adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); @@ -85,7 +75,7 @@ class IndexShiftTest { indexDefinition.setFullIndexName("test_shift2"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift2", "doc", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index ee0f5df..2646979 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -2,6 +2,7 @@ package org.xbib.elx.node.test; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.unit.TimeValue; @@ -26,9 +27,9 @@ class SearchTest { private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); - private static final Long ACTIONS = 1000L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 100L; + private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; private final TestExtension.Helper helper; @@ -39,23 +40,22 @@ class SearchTest { @Test void testDocStream() throws Exception { long numactions = ACTIONS; + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < numactions; i++) { - bulkClient.index("test", "doc", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.stopBulk(indexDefinition); + assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); @@ -68,7 +68,7 @@ class SearchTest { .build()) { // test stream count Stream stream = searchClient.search(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(100), 570); long count = stream.count(); @@ -78,7 +78,7 @@ class SearchTest { assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); // test stream docs stream = searchClient.search(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(10), 79); final AtomicInteger hitcount = new AtomicInteger(); @@ -89,7 +89,7 @@ class SearchTest { assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); // test stream doc ids Stream ids = searchClient.getIds(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery())); final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> { diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index be7ce6c..9dcac97 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; @@ -14,6 +15,7 @@ import org.xbib.elx.node.NodeBulkClientProvider; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -38,34 +40,29 @@ class SmokeTest { .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { + assertEquals(helper.getClusterName(), adminClient.getClusterName()); IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke_definition", Settings.EMPTY); + new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); + assertEquals("test_smoke", indexDefinition.getIndex()); + assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); assertEquals(0, indexDefinition.getReplicaLevel()); - assertEquals(helper.getClusterName(), adminClient.getClusterName()); - indexDefinition.setFullIndexName("test_smoke"); indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); - logger.info("new index: done"); - bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - logger.info("index doc: done"); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); - logger.info("flush: done"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - logger.info("wait: done"); - adminClient.checkMapping("test_smoke"); - logger.info("check mapping: done"); - bulkClient.update("test_smoke", "doc", "1", "{ \"name\" : \"Another name\"}"); - bulkClient.delete("test_smoke", "doc", "1"); + adminClient.checkMapping(indexDefinition); + bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); + bulkClient.delete(indexDefinition, "1"); bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.delete("test_smoke", "doc", "1"); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete(indexDefinition, "1"); bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.deleteIndex("test_smoke"); - logger.info("delete index: done"); + adminClient.deleteIndex(indexDefinition); bulkClient.newIndex(indexDefinition); - bulkClient.index(indexDefinition.getFullIndexName(), "doc", "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.updateReplicaLevel(indexDefinition, 2); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java b/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java deleted file mode 100644 index 662c94e..0000000 --- a/elx-node/src/test/java/org/xbib/elx/node/test/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * - */ -package org.xbib.elx.node.test; diff --git a/elx-node/src/test/resources/log4j2-test.xml b/elx-node/src/test/resources/log4j2-test.xml index 11bffcf..732ac2e 100644 --- a/elx-node/src/test/resources/log4j2-test.xml +++ b/elx-node/src/test/resources/log4j2-test.xml @@ -6,7 +6,7 @@ - + diff --git a/elx-common/src/main/java/org/xbib/elx/common/util/NetworkUtils.java b/elx-transport/src/main/java/org/xbib/elx/transport/NetworkUtils.java similarity index 99% rename from elx-common/src/main/java/org/xbib/elx/common/util/NetworkUtils.java rename to elx-transport/src/main/java/org/xbib/elx/transport/NetworkUtils.java index 11dd014..35f85ac 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/util/NetworkUtils.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/NetworkUtils.java @@ -1,4 +1,4 @@ -package org.xbib.elx.common.util; +package org.xbib.elx.transport; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 5ce2a5a..08ee421 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.jboss.netty.channel.DefaultChannelFuture; -import org.xbib.elx.common.util.NetworkUtils; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/package-info.java b/elx-transport/src/main/java/org/xbib/elx/transport/package-info.java deleted file mode 100644 index 3697854..0000000 --- a/elx-transport/src/main/java/org/xbib/elx/transport/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Classes for Elasticsearch transport client extensions. - */ -package org.xbib.elx.transport; diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index b1521a0..4f9dc55 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -2,7 +2,6 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; @@ -45,14 +44,12 @@ class BulkClientTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); - bulkClient.index("test", "docd", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); @@ -69,9 +66,7 @@ class BulkClientTest { .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); } } @@ -96,12 +91,10 @@ class BulkClientTest { .endObject() .endObject() .endObject(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setMappings(builder.string()); bulkClient.newIndex(indexDefinition); - assertTrue(adminClient.getMapping("test").containsKey("properties")); + assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); } } @@ -111,15 +104,13 @@ class BulkClientTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", "docs", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); @@ -129,8 +120,8 @@ class BulkClientTest { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } } @@ -143,32 +134,24 @@ class BulkClientTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads * 2) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .put(Parameters.ENABLE_BULK_LOGGING.name(), "true") + .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads * 2) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") + .put(Parameters.ENABLE_BULK_LOGGING.getName(), Boolean.TRUE) .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) - .endObject() - .endObject(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test"); - indexDefinition.setSettings(builder.string()); + indexDefinition.setStartBulkRefreshSeconds(0); + indexDefinition.setStopBulkRefreshSeconds(60); bulkClient.newIndex(indexDefinition); - bulkClient.startBulk("test", 0, 1000); + bulkClient.startBulk(indexDefinition); logger.info("index created"); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { executorService.execute(() -> { for (int i1 = 0; i1 < actions; i1++) { - bulkClient.index("test", "docs", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } latch.countDown(); @@ -186,10 +169,10 @@ class BulkClientTest { } else { logger.warn("latch timeout"); } - bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index 0a5477a..36045d7 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -36,20 +36,18 @@ class DuplicateIDTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", "docs", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); + bulkClient.refreshIndex(indexDefinition); + assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index c2f1b1c..aa9bc61 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -2,8 +2,6 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; @@ -48,33 +46,23 @@ class IndexPruneTest { .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) - .endObject() - .endObject(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test_prune"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_prune1"); - indexDefinition.setSettings(builder.string()); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune2"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune3"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune4"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); IndexRetention indexRetention = new DefaultIndexRetention(); indexRetention.setDelta(2); indexRetention.setMinToKeep(2); @@ -91,7 +79,9 @@ class IndexPruneTest { assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - list.add(adminClient.isIndexExists(index)); + IndexDefinition indexDefinition1 = new DefaultIndexDefinition(index, null); + indexDefinition1.setFullIndexName(index); + list.add(adminClient.isIndexExists(indexDefinition1)); } logger.info(list); assertFalse(list.get(0)); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 1a06d7a..db0f692 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -4,8 +4,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.cluster.metadata.AliasAction; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -48,28 +46,18 @@ class IndexShiftTest { .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("index") - .field("number_of_shards", 1) - .field("number_of_replicas", 0) - .endObject() - .endObject(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test"); - indexDefinition.setType("doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setFullIndexName("test_shift"); - indexDefinition.setSettings(builder.string()); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift", "doc", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); indexDefinition.setShift(true); IndexShiftResult indexShiftResult = - adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c")); + adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null); assertTrue(indexShiftResult.getNewAliases().contains("a")); assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); @@ -80,7 +68,8 @@ class IndexShiftTest { assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("test")); Optional resolved = adminClient.resolveAlias("test").stream().findFirst(); - aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); + aliases = resolved.isPresent() ? + adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); @@ -88,7 +77,7 @@ class IndexShiftTest { indexDefinition.setFullIndexName("test_shift2"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift2", "doc", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 48e02b7..384f681 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -22,13 +22,14 @@ import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class SearchTest { private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); - private static final Long ACTIONS = 1000L; + private static final Long ACTIONS = 100000L; private static final Long MAX_ACTIONS_PER_REQUEST = 100L; @@ -41,23 +42,22 @@ class SearchTest { @Test void testDocStream() throws Exception { long numactions = ACTIONS; + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setFullIndexName("test"); - indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < numactions; i++) { - bulkClient.index("test", "doc", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.stopBulk(indexDefinition); + assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); @@ -70,7 +70,7 @@ class SearchTest { .build()) { // test stream count Stream stream = searchClient.search(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(100), 570); long count = stream.count(); @@ -80,7 +80,7 @@ class SearchTest { assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); // test stream docs stream = searchClient.search(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(10), 79); final AtomicInteger hitcount = new AtomicInteger(); @@ -91,7 +91,7 @@ class SearchTest { assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); // test stream doc ids Stream ids = searchClient.getIds(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery())); final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> { diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 3c62102..93813a5 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; @@ -16,6 +17,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class SmokeTest { @@ -39,28 +41,24 @@ class SmokeTest { .put(helper.getTransportSettings()) .build()) { IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); + assertEquals("test_smoke", indexDefinition.getIndex()); + assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); - indexDefinition.setFullIndexName("test_smoke"); - indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); - bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.checkMapping("test_smoke"); - bulkClient.update("test_smoke", "doc", "1", "{ \"name\" : \"Another name\"}"); - bulkClient.delete("test_smoke", "doc", "1"); - bulkClient.flush(); + adminClient.checkMapping(indexDefinition); + bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); + bulkClient.delete(indexDefinition, "1"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - bulkClient.index("test_smoke", "doc", "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.delete("test_smoke", "doc", "1"); - bulkClient.flush(); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete(indexDefinition, "1"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.deleteIndex("test_smoke"); + adminClient.deleteIndex(indexDefinition); bulkClient.newIndex(indexDefinition); - bulkClient.index(indexDefinition.getFullIndexName(), "doc", "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.flush(); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/package-info.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/package-info.java deleted file mode 100644 index b038c6f..0000000 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package org.xbib.elx.transport.test; \ No newline at end of file diff --git a/elx-transport/src/test/resources/log4j2-test.xml b/elx-transport/src/test/resources/log4j2-test.xml index 11bffcf..732ac2e 100644 --- a/elx-transport/src/test/resources/log4j2-test.xml +++ b/elx-transport/src/test/resources/log4j2-test.xml @@ -6,7 +6,7 @@ - + diff --git a/gradle.properties b/gradle.properties index fb5bf3f..7d9b204 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = elx -version = 2.2.1.31 +version = 2.2.1.32 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0