From 6d434a98c64d2c04c2fc7a7c49d40b269694ed14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Wed, 21 Apr 2021 14:44:58 +0200 Subject: [PATCH] align with es7102 --- .../java/org/xbib/elx/api/AdminClient.java | 4 +- .../java/org/xbib/elx/api/BasicClient.java | 6 +- .../java/org/xbib/elx/api/BulkClient.java | 12 +-- .../java/org/xbib/elx/api/BulkProcessor.java | 1 - .../xbib/elx/common/AbstractAdminClient.java | 63 +++++++------- .../xbib/elx/common/AbstractBasicClient.java | 9 +- .../xbib/elx/common/AbstractBulkClient.java | 62 +++++++------ .../xbib/elx/common/AbstractSearchClient.java | 21 ++++- .../xbib/elx/common/DefaultBulkListener.java | 1 + .../xbib/elx/common/DefaultBulkProcessor.java | 10 ++- .../org/xbib/elx/common/test/AliasTest.java | 14 +-- .../org/xbib/elx/common/test/SearchTest.java | 2 +- .../org/xbib/elx/common/test/SimpleTest.java | 10 +-- .../xbib/elx/common/test/TestExtension.java | 44 +++------- .../xbib/elx/common/test/WildcardTest.java | 2 +- .../resources/{log4j2.xml => log4j2-test.xml} | 0 .../org/xbib/elx/node/NodeAdminClient.java | 2 +- .../org/xbib/elx/node/NodeBulkClient.java | 2 +- .../org/xbib/elx/node/NodeClientHelper.java | 4 +- .../org/xbib/elx/node/NodeSearchClient.java | 2 +- .../xbib/elx/node/test/BulkClientTest.java | 87 ++++++------------- .../xbib/elx/node/test/DuplicateIDTest.java | 11 ++- .../xbib/elx/node/test/IndexPruneTest.java | 18 ++-- .../xbib/elx/node/test/IndexShiftTest.java | 26 +++--- .../org/xbib/elx/node/test/SearchTest.java | 26 +++--- .../org/xbib/elx/node/test/SmokeTest.java | 35 +++++--- .../org/xbib/elx/node/test/TestExtension.java | 21 ++--- .../org/xbib/elx/transport/NetworkUtils.java | 3 - .../elx/transport/TransportAdminClient.java | 2 +- .../elx/transport/TransportBulkClient.java | 2 +- .../elx/transport/TransportSearchClient.java | 2 +- .../elx/transport/test/BulkClientTest.java | 81 +++++------------ .../elx/transport/test/DuplicateIDTest.java | 13 +-- .../elx/transport/test/IndexPruneTest.java | 11 +-- .../elx/transport/test/IndexShiftTest.java | 22 +++-- .../xbib/elx/transport/test/SearchTest.java | 22 ++--- .../xbib/elx/transport/test/SmokeTest.java | 31 +++++-- 37 files changed, 316 insertions(+), 368 deletions(-) rename elx-common/src/test/resources/{log4j2.xml => log4j2-test.xml} (100%) diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index 0ac9d13..0c5e71a 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -47,7 +47,7 @@ public interface AdminClient extends BasicClient { * Resolve alias. * * @param alias the alias - * @return this index name behind the alias or the alias if there is no index + * @return the index names in ordered sequence behind the alias or an empty list if there is no such alias */ List resolveAlias(String alias); @@ -89,7 +89,7 @@ public interface AdminClient extends BasicClient { /** * Find the timestamp of the most recently indexed document in the index. * - * @param indexDefinition the index + * @param indexDefinition the index definition * @param timestampfieldname the timestamp field name * @return millis UTC millis of the most recent document * @throws IOException if most rcent document can not be found diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index cd0bee8..fe0ead6 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -52,16 +52,16 @@ public interface BasicClient extends Closeable { * Wait for cluster being healthy. * * @param healthColor cluster health color to wait for - * @param maxWaitTime time value + * @param maxWaitTime time value * @param timeUnit time unit */ void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); void waitForShards(long maxWaitTime, TimeUnit timeUnit); - long getSearchableDocs(IndexDefinition index); + long getSearchableDocs(IndexDefinition indexDefinition); - boolean isIndexExists(IndexDefinition index); + boolean isIndexExists(IndexDefinition indexDefinition); ScheduledThreadPoolExecutor getScheduler(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 52c3dce..13920db 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -36,8 +36,8 @@ public interface BulkClient extends BasicClient, Flushable { * Add index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when limits are exceeded. * - * @param indexDefinition the index definition - * @param id the id + * @param indexDefinition the index definition + * @param id the id * @param create true if document must be created * @param source the source * @return this @@ -48,8 +48,8 @@ public interface BulkClient extends BasicClient, Flushable { * Index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when limits are exceeded. * - * @param indexDefinition the index definition - * @param id the id + * @param indexDefinition the index definition + * @param id the id * @param create true if document is to be created, false otherwise * @param source the source * @return this client methods @@ -88,7 +88,7 @@ public interface BulkClient extends BasicClient, Flushable { * Submitting request will be done when bulk limits are exceeded. * Note that updates only work correctly when all operations between nodes are synchronized. * - * @param indexDefinition the index definition + * @param indexDefinition the index definition * @param id the id * @param source the source * @return this @@ -149,5 +149,5 @@ public interface BulkClient extends BasicClient, Flushable { */ void flushIndex(IndexDefinition indexDefinition); - BulkProcessor getBulkController(); + BulkProcessor getBulkProcessor(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index b313483..f4508b0 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -5,7 +5,6 @@ import org.elasticsearch.action.ActionRequest; import java.io.Closeable; import java.io.Flushable; import java.io.IOException; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 7d4c094..f271fc1 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; @@ -54,6 +55,7 @@ import org.xbib.elx.api.IndexPruneResult; import org.xbib.elx.api.IndexShiftResult; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -67,9 +69,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -81,7 +81,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); @Override - public Map getMapping(IndexDefinition indexDefinition) throws IOException { + public Map getMapping(IndexDefinition indexDefinition) { if (isIndexDefinitionDisabled(indexDefinition)) { return null; } @@ -89,10 +89,14 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements .setIndices(indexDefinition.getFullIndexName()) .setTypes(indexDefinition.getType()); GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); - return getMappingsResponse.getMappings() - .get(indexDefinition.getFullIndexName()) - .get(indexDefinition.getType()) - .getSourceAsMap(); + try { + return getMappingsResponse.getMappings() + .get(indexDefinition.getFullIndexName()) + .get(indexDefinition.getType()) + .getSourceAsMap(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } @Override @@ -206,17 +210,18 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return new EmptyIndexShiftResult(); } if (indexDefinition.isShiftEnabled()) { - return shiftIndex(indexDefinition.getIndex(), - indexDefinition.getFullIndexName(), additionalAliases.stream() + return shiftIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), + additionalAliases.stream() .filter(a -> a != null && !a.isEmpty()) .collect(Collectors.toList()), indexAliasAdder); } return new EmptyIndexShiftResult(); } - private IndexShiftResult shiftIndex(String index, String fullIndexName, - List additionalAliases, - IndexAliasAdder adder) { + private IndexShiftResult shiftIndex(String index, + String fullIndexName, + List additionalAliases, + IndexAliasAdder adder) { ensureClientIsPresent(); if (index == null) { return new EmptyIndexShiftResult(); // nothing to shift to @@ -289,7 +294,9 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { - return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() ? + return indexDefinition != null && indexDefinition.isEnabled() && indexDefinition.isPruneEnabled() && + indexDefinition.getRetention() != null && + indexDefinition.getDateTimePattern() != null ? pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), indexDefinition.getDateTimePattern(), @@ -315,7 +322,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements ensureClientIsPresent(); GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); - logger.info("before pruning: protected = " + protectedIndexName + " found total of {} indices", getIndexResponse.getIndices().length); + logger.info("before pruning: found total of {} indices", getIndexResponse.getIndices().length); List candidateIndices = new ArrayList<>(); for (String s : getIndexResponse.getIndices()) { Matcher m = pattern.matcher(s); @@ -372,7 +379,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(indexDefinition.getFullIndexName()); searchRequest.source(builder); - SearchResponse searchResponse = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); + SearchResponse searchResponse = + client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); if (searchResponse.getHits().getHits().length == 1) { SearchHit hit = searchResponse.getHits().getHits()[0]; if (hit.getFields().get(timestampfieldname) != null) { @@ -394,22 +402,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements return false; } ensureClientIsPresent(); - String index = indexDefinition.getFullIndexName(); + logger.info("force merge of " + indexDefinition); ForceMergeRequest forceMergeRequest = new ForceMergeRequest(); - forceMergeRequest.indices(index); - try { - client.execute(ForceMergeAction.INSTANCE, forceMergeRequest) - .get(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); - return true; - } catch (TimeoutException e) { - logger.error("timeout"); - } catch (ExecutionException e) { - logger.error(e.getMessage(), e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error(e.getMessage(), e); + forceMergeRequest.indices(indexDefinition.getFullIndexName()); + ForceMergeResponse forceMergeResponse = + client.execute(ForceMergeAction.INSTANCE, forceMergeRequest).actionGet(); + if (forceMergeResponse.getFailedShards() > 0) { + throw new IllegalStateException("failed shards after force merge: " + forceMergeResponse.getFailedShards()); } - return false; + waitForCluster("GREEN", 300L, TimeUnit.SECONDS); + return true; } @Override @@ -467,7 +469,8 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements .setTypes(type) .setQuery(QueryBuilders.matchAllQuery()) .setSize(0); - SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); + SearchResponse searchResponse = + searchRequestBuilder.execute().actionGet(); long total = searchResponse.getHits().getTotalHits(); if (total > 0L) { Map fields = new TreeMap<>(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java index a00998b..b1429f5 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBasicClient.java @@ -116,7 +116,7 @@ public abstract class AbstractBasicClient implements BasicClient { Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); + updateSettingsRequest.transientSettings(updateSettingsBuilder).timeout(toTimeValue(timeout, timeUnit)); client.execute(ClusterUpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); } @@ -140,6 +140,7 @@ public abstract class AbstractBasicClient implements BasicClient { @Override public void waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { ensureClientIsPresent(); + logger.info("waiting for cluster status " + statusString + " for " + maxWaitTime + " " + timeUnit); ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() @@ -165,7 +166,7 @@ public abstract class AbstractBasicClient implements BasicClient { ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); if (healthResponse.isTimedOut()) { - String message = "timeout waiting for cluster shards"; + String message = "timeout waiting for cluster shards: " + timeout; logger.error(message); throw new IllegalStateException(message); } @@ -257,8 +258,8 @@ public abstract class AbstractBasicClient implements BasicClient { protected boolean isIndexDefinitionDisabled(IndexDefinition indexDefinition) { if (!indexDefinition.isEnabled()) { - logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled"); - return true; + logger.warn("index " + indexDefinition.getFullIndexName() + " is disabled"); + return true; } return false; } diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 3df6d20..55ad8f8 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -48,7 +48,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } @Override - public BulkProcessor getBulkController() { + public BulkProcessor getBulkProcessor() { return bulkProcessor; } @@ -100,14 +100,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings()).build(); createIndexRequestBuilder.setSettings(settings); - // must be Map to match prototype of addMapping()! - Map mappings = indexDefinition.getMappings() == null ? null : - JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); - if (mappings != null) { + if (indexDefinition.getMappings() != null) { + Map mappings = JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); createIndexRequestBuilder.addMapping(type, mappings); } else { - createIndexRequestBuilder.addMapping(type, - JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject()); + XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(type).endObject().endObject(); + createIndexRequestBuilder.addMapping(type, builder); } CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); if (createIndexResponse.isAcknowledged()) { @@ -117,8 +115,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements return; } // we really need state GREEN. If yellow, we may trigger shard write errors and queue will exceed quickly. - logger.info("waiting for GREEN after index {} was created", index); - waitForCluster("GREEN", indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); + waitForCluster("GREEN", 300L, TimeUnit.SECONDS); } @Override @@ -126,17 +123,19 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (isIndexDefinitionDisabled(indexDefinition)) { return; } - ensureClientIsPresent(); - Long bulkQueueSize = getThreadPoolQueueSize("bulk"); - if (bulkQueueSize != null && bulkQueueSize <= 64) { - logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4); - bulkQueueSize = bulkQueueSize * 4; - } else { - logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256"); - bulkQueueSize = 256L; + if (bulkProcessor != null) { + ensureClientIsPresent(); + Long bulkQueueSize = getThreadPoolQueueSize("bulk"); + if (bulkQueueSize != null && bulkQueueSize <= 64) { + logger.info("found bulk queue size " + bulkQueueSize + ", expanding to " + bulkQueueSize * 4); + bulkQueueSize = bulkQueueSize * 4; + } else { + logger.warn("undefined or small bulk queue size found: " + bulkQueueSize + " assuming 256"); + bulkQueueSize = 256L; + } + putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); + bulkProcessor.startBulkMode(indexDefinition); } - putClusterSetting("threadpool.bulk.queue_size", bulkQueueSize, 30L, TimeUnit.SECONDS); - bulkProcessor.startBulkMode(indexDefinition); } @Override @@ -144,8 +143,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (isIndexDefinitionDisabled(indexDefinition)) { return; } - ensureClientIsPresent(); - bulkProcessor.stopBulkMode(indexDefinition); + if (bulkProcessor != null) { + ensureClientIsPresent(); + bulkProcessor.stopBulkMode(indexDefinition); + } } @Override @@ -166,8 +167,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(IndexRequest indexRequest) { - ensureClientIsPresent(); - bulkProcessor.add(indexRequest); + if (bulkProcessor != null) { + ensureClientIsPresent(); + bulkProcessor.add(indexRequest); + } return this; } @@ -184,8 +187,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient delete(DeleteRequest deleteRequest) { - ensureClientIsPresent(); - bulkProcessor.add(deleteRequest); + if (bulkProcessor != null) { + ensureClientIsPresent(); + bulkProcessor.add(deleteRequest); + } return this; } @@ -214,8 +219,11 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public boolean waitForResponses(long timeout, TimeUnit timeUnit) { - ensureClientIsPresent(); - return bulkProcessor.waitForBulkResponses(timeout, timeUnit); + if (bulkProcessor != null) { + ensureClientIsPresent(); + return bulkProcessor.waitForBulkResponses(timeout, timeUnit); + } + return true; } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java index 40b472b..86ede0f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractSearchClient.java @@ -60,10 +60,10 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement @Override public void close() throws IOException { if (closed.compareAndSet(false, true)) { + super.close(); if (searchMetric != null) { searchMetric.close(); } - super.close(); } } @@ -137,9 +137,24 @@ public abstract class AbstractSearchClient extends AbstractBasicClient implement SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE); queryBuilder.accept(searchRequestBuilder); searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); - SearchResponse initialSearchResponse = searchRequestBuilder.execute().actionGet(); + searchRequestBuilder.setScroll(scrollTime).setSize(scrollSize); + ActionFuture actionFuture = searchRequestBuilder.execute(); + searchMetric.getCurrentQueries().inc(); + SearchResponse initialSearchResponse = actionFuture.actionGet(); + searchMetric.getCurrentQueries().dec(); + searchMetric.getQueries().inc(); + searchMetric.markTotalQueries(1); + if (initialSearchResponse.getFailedShards() > 0) { + searchMetric.getFailedQueries().inc(); + } else if (initialSearchResponse.isTimedOut()) { + searchMetric.getTimeoutQueries().inc(); + } else if (initialSearchResponse.getHits().getTotalHits() == 0) { + searchMetric.getEmptyQueries().inc(); + } else { + searchMetric.getSucceededQueries().inc(); + } Stream responseStream = Stream.iterate(initialSearchResponse, - searchResponse -> { + searchResponse -> { SearchScrollRequestBuilder searchScrollRequestBuilder = new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE) .setScrollId(searchResponse.getScrollId()) diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 0d8ceeb..43348ff 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -68,6 +68,7 @@ public class DefaultBulkListener implements BulkListener { long l = bulkMetric.getCurrentIngest().getCount(); bulkMetric.getCurrentIngest().dec(); bulkMetric.getSucceeded().inc(response.getItems().length); + bulkMetric.markTotalIngest(response.getItems().length); int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 0bc417b..2b4dbbf 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -102,7 +102,8 @@ public class DefaultBulkProcessor implements BulkProcessor { int interval = indexDefinition.getStartBulkRefreshSeconds(); if (interval != 0) { logger.info("starting bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); + bulkClient.updateIndexSetting(indexName, "refresh_interval", + interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); } else { logger.warn("ignoring starting bulk on " + indexName + " with refresh interval " + interval); } @@ -116,7 +117,8 @@ public class DefaultBulkProcessor implements BulkProcessor { if (waitForBulkResponses(indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit())) { if (interval != 0) { logger.info("stopping bulk on " + indexName + " with new refresh interval " + interval); - bulkClient.updateIndexSetting(indexName, "refresh_interval", interval + "s", 30L, TimeUnit.SECONDS); + bulkClient.updateIndexSetting(indexName, "refresh_interval", + interval >= 0 ? interval + "s" : interval, 30L, TimeUnit.SECONDS); } else { logger.warn("ignoring stopping bulk on " + indexName + " with refresh interval " + interval); } @@ -134,8 +136,8 @@ public class DefaultBulkProcessor implements BulkProcessor { } @Override - public void setMaxBulkVolume(long bulkSize) { - this.bulkVolume = bulkSize; + public void setMaxBulkVolume(long bulkVolume) { + this.bulkVolume = bulkVolume; } @Override diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java index 0e11135..6444819 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/AliasTest.java @@ -43,13 +43,13 @@ class AliasTest { @Test void testAlias() { - ElasticsearchClient client = helper.client("1"); + ElasticsearchClient client = helper.client(); CreateIndexRequest indexRequest = new CreateIndexRequest("test_index"); client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet(); client.execute(RefreshAction.INSTANCE, new RefreshRequest()).actionGet(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); - String[] indices = new String[]{"test_index"}; - String[] aliases = new String[]{"test_alias"}; + String[] indices = { "test_index" }; + String[] aliases = { "test_alias" }; IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); indicesAliasesRequest.addAliasAction(aliasAction); @@ -67,7 +67,7 @@ class AliasTest { @Test void testMostRecentIndex() { - ElasticsearchClient client = helper.client("1"); + ElasticsearchClient client = helper.client(); String alias = "test"; CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101"); client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet(); @@ -76,7 +76,11 @@ class AliasTest { indexRequest = new CreateIndexRequest("test20160103"); client.execute(CreateIndexAction.INSTANCE, indexRequest).actionGet(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); - String[] indices = new String[] { "test20160101", "test20160102", "test20160103" }; + String[] indices = { + "test20160101", + "test20160102", + "test20160103" + }; String[] aliases = new String[] { alias }; IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases); diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java index 95ab468..1d552ae 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/SearchTest.java @@ -29,7 +29,7 @@ class SearchTest { @Test void testSearch() throws Exception { - ElasticsearchClient client = helper.client("1"); + ElasticsearchClient client = helper.client(); BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE); for (int i = 0; i < 1; i++) { IndexRequest indexRequest = new IndexRequest().index("pages").type("row") diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java index c1bdb9a..757f1af 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/SimpleTest.java @@ -33,7 +33,7 @@ class SimpleTest { try { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices("test"); - helper.client("1").execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); + helper.client().execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); } catch (IndexNotFoundException e) { // ignore if index not found } @@ -44,22 +44,22 @@ class SimpleTest { .build(); CreateIndexRequest createIndexRequest = new CreateIndexRequest(); createIndexRequest.index("test").settings(indexSettings); - helper.client("1").execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); + helper.client().execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); IndexRequest indexRequest = new IndexRequest(); indexRequest.index("test").type("test").id("1") .source(XContentFactory.jsonBuilder().startObject().field("field", "1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8").endObject()); - helper.client("1").execute(IndexAction.INSTANCE, indexRequest).actionGet(); + helper.client().execute(IndexAction.INSTANCE, indexRequest).actionGet(); RefreshRequest refreshRequest = new RefreshRequest(); refreshRequest.indices("test"); - helper.client("1").execute(RefreshAction.INSTANCE, refreshRequest).actionGet(); + helper.client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet(); SearchSourceBuilder builder = new SearchSourceBuilder(); builder.query(QueryBuilders.matchQuery("field", "1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8")); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices("test").types("test"); searchRequest.source(builder); - String doc = helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet() + String doc = helper.client().execute(SearchAction.INSTANCE, searchRequest).actionGet() .getHits().getAt(0).getSourceAsString(); assertEquals(doc, "{\"field\":\"1%2fPJJP3JV2C24iDfEu9XpHBaYxXh%2fdHTbmchB35SDznXO2g8Vz4D7GTIvY54iMiX_149c95f02a8\"}"); diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java index 2b562f7..d567d6d 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/TestExtension.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -33,8 +32,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; -import java.util.HashMap; -import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -72,9 +69,9 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Helper helper = extensionContext.getParent().get().getStore(ns) .getOrComputeIfAbsent(key + count.get(), key -> create(), Helper.class); logger.info("starting cluster with helper " + helper + " at " + helper.getHome()); - helper.startNode("1"); + helper.startNode(); try { - ClusterHealthResponse healthResponse = helper.client("1").execute(ClusterHealthAction.INSTANCE, + ClusterHealthResponse healthResponse = helper.client().execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) .timeout(TimeValue.timeValueSeconds(30))).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { @@ -86,7 +83,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateResponse clusterStateResponse = - helper.client("1").execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); + helper.client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); logger.info("cluster name = {}", clusterStateResponse.getClusterName().value()); } @@ -101,15 +98,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } private void closeNodes(Helper helper) { - logger.info("closing all clients"); - for (AbstractClient client : helper.clients.values()) { - client.close(); - } - logger.info("closing all nodes"); - for (Node node : helper.nodes.values()) { - if (node != null) { - node.close(); - } + if (helper.node != null) { + helper.node.close(); } logger.info("all nodes closed"); } @@ -151,9 +141,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft int port; - Map nodes = new HashMap<>(); - - Map clients = new HashMap<>(); + Node node; void setHome(String home) { this.home = home; @@ -178,10 +166,10 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .build(); } - void startNode(String id) { - buildNode(id).start(); + void startNode() { + buildNode().start(); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = client(id). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + NodesInfoResponse response = client(). execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); Object obj = response.iterator().next().getTransport().getAddress() .publishAddress(); if (obj instanceof InetSocketTransportAddress) { @@ -191,8 +179,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft } } - ElasticsearchClient client(String id) { - return clients.get(id); + ElasticsearchClient client() { + return node.client(); } String randomString(int len) { @@ -204,16 +192,12 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return new String(buf); } - private Node buildNode(String id) { + private Node buildNode() { Settings nodeSettings = Settings.builder() .put(getNodeSettings()) - .put("node.name", id) + .put("node.name", "1") .build(); - Node node = new MockNode(nodeSettings); - AbstractClient client = (AbstractClient) node.client(); - nodes.put(id, node); - clients.put(id, client); - return node; + return new MockNode(nodeSettings); } } } diff --git a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java index 4fa10e0..47f34af 100644 --- a/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java +++ b/elx-common/src/test/java/org/xbib/elx/common/test/WildcardTest.java @@ -26,7 +26,7 @@ class WildcardTest { @Test void testWildcard() throws Exception { - ElasticsearchClient client = helper.client("1"); + ElasticsearchClient client = helper.client(); index(client, "1", "010"); index(client, "2", "0*0"); // exact diff --git a/elx-common/src/test/resources/log4j2.xml b/elx-common/src/test/resources/log4j2-test.xml similarity index 100% rename from elx-common/src/test/resources/log4j2.xml rename to elx-common/src/test/resources/log4j2-test.xml diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java index 3e2fdc1..7b73a19 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeAdminClient.java @@ -15,7 +15,7 @@ public class NodeAdminClient extends AbstractAdminClient { } @Override - protected ElasticsearchClient createClient(Settings settings) { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index 366d2d4..566d99e 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -15,7 +15,7 @@ public class NodeBulkClient extends AbstractBulkClient { } @Override - protected ElasticsearchClient createClient(Settings settings) { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index 46751fa..5f1147c 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -36,10 +36,10 @@ public class NodeClientHelper { key -> innerCreateClient(settings)); } - public void closeClient(Settings settings) { + public void closeClient(Settings settings) throws IOException { ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); if (client != null) { - logger.debug("closing node"); + logger.debug("closing node..."); node.close(); node = null; } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index fbb341c..cfcf2fe 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -15,7 +15,7 @@ public class NodeSearchClient extends AbstractSearchClient { } @Override - protected ElasticsearchClient createClient(Settings settings) { + public ElasticsearchClient createClient(Settings settings) { return helper.createClient(settings, null); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 158d2aa..6de3be9 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -2,15 +2,11 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.node.NodeAdminClient; -import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -21,14 +17,13 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class BulkClientTest { private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName()); - private static final Long ACTIONS = 10000L; + private static final Long ACTIONS = 100000L; private final TestExtension.Helper helper; @@ -36,67 +31,39 @@ class BulkClientTest { this.helper = helper; } + @Test + void testNewIndex() throws Exception { + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) + .setBulkClientProvider(NodeBulkClientProvider.class) + .put(helper.getNodeSettings()) + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + } + } + @Test void testSingleDoc() throws Exception { - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); - } - } - - @Test - void testNewIndex() throws Exception { - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) - .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) - .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - bulkClient.newIndex(indexDefinition); - } - } - - @Test - void testMapping() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) - .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) - .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) - .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) - .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("doc") - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject() - .endObject(); - indexDefinition.setMappings(builder.string()); - bulkClient.newIndex(indexDefinition); - assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } @Test void testRandomDocs() throws Exception { long numactions = ACTIONS; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { @@ -109,11 +76,11 @@ class BulkClientTest { } bulkClient.stopBulk(indexDefinition); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } @@ -124,7 +91,7 @@ class BulkClientTest { int maxthreads = Runtime.getRuntime().availableProcessors(); final long actions = ACTIONS; long timeout = 120L; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { @@ -150,13 +117,13 @@ class BulkClientTest { logger.error("latch timeout!"); } bulkClient.stopBulk(indexDefinition); - assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index e7dce75..b9f3e02 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -35,7 +35,7 @@ class DuplicateIDTest { @Test void testDuplicateDocIDs() throws Exception { long numactions = ACTIONS; - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) @@ -46,15 +46,14 @@ class DuplicateIDTest { bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index b4ccc54..5560139 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -14,6 +14,7 @@ import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -37,15 +38,16 @@ class IndexPruneTest { @Test void testPrune() throws IOException { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getNodeSettings()) .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition("test_prune", "doc"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setIndex("test_prune"); indexDefinition.setFullIndexName("test_prune1"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); @@ -65,8 +67,6 @@ class IndexPruneTest { IndexRetention indexRetention = new DefaultIndexRetention(); indexRetention.setDelta(2); indexRetention.setMinToKeep(2); - indexDefinition.setIndex("test_prune"); - indexDefinition.setFullIndexName("test_prune4"); indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); indexDefinition.setPrune(true); @@ -87,10 +87,10 @@ class IndexPruneTest { assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index a5dd7c4..49545df 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -36,11 +36,11 @@ class IndexShiftTest { @Test void testIndexShift() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { @@ -51,8 +51,9 @@ class IndexShiftTest { bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + indexDefinition.setIndex("test"); + indexDefinition.setFullIndexName("test_shift"); indexDefinition.setShift(true); IndexShiftResult indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null); @@ -60,27 +61,26 @@ class IndexShiftTest { assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().isEmpty()); - Map aliases = adminClient.getAliases("test_shift"); + Map aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test")); - Optional resolved = adminClient.resolveAlias("test").stream().findFirst(); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); + Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test")); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); indexDefinition.setFullIndexName("test_shift2"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - indexDefinition.setFullIndexName("test_shift2"); + indexDefinition.setShift(true); indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, index, alias).filter(QueryBuilders.termQuery("my_key", alias))) @@ -91,7 +91,7 @@ class IndexShiftTest { assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("c")); - aliases = adminClient.getAliases("test_shift2"); + aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); @@ -106,10 +106,10 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index bb62788..6fa4b2b 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -13,7 +13,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; import org.xbib.elx.node.NodeSearchClient; @@ -29,8 +28,6 @@ class SearchTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; - private final TestExtension.Helper helper; SearchTest(TestExtension.Helper helper) { @@ -41,10 +38,9 @@ class SearchTest { void testDocStream() throws Exception { long numactions = ACTIONS; IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -56,13 +52,13 @@ class SearchTest { assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } - try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client) + try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client()) .setSearchClientProvider(NodeSearchClientProvider.class) .put(helper.getNodeSettings()) .build()) { @@ -70,7 +66,7 @@ class SearchTest { Stream stream = searchClient.search(qb -> qb .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), - TimeValue.timeValueMillis(100), 570); + TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); @@ -92,13 +88,13 @@ class SearchTest { .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery())); final AtomicInteger idcount = new AtomicInteger(); - ids.forEach(id -> { - idcount.incrementAndGet(); - }); + ids.forEach(id -> idcount.incrementAndGet()); assertEquals(numactions, idcount.get()); + assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 9dcac97..1a40185 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -3,6 +3,8 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; @@ -32,13 +34,13 @@ class SmokeTest { @Test void smokeTest() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) .setAdminClientProvider(NodeAdminClientProvider.class) .put(helper.getNodeSettings()) .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) + .put(helper.getNodeSettings()) .build()) { assertEquals(helper.getClusterName(), adminClient.getClusterName()); IndexDefinition indexDefinition = @@ -54,28 +56,37 @@ class SmokeTest { adminClient.checkMapping(indexDefinition); bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.delete(indexDefinition, "1"); - bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.deleteIndex(indexDefinition); bulkClient.newIndex(indexDefinition); bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.flush(); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); adminClient.deleteIndex(indexDefinition); - logger.info("done"); + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + indexDefinition.setMappings(builder.string()); + bulkClient.newIndex(indexDefinition); + assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index 8b46b96..16fa2e0 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -12,7 +12,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -121,8 +121,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft Node node; - AbstractClient client; - void setHome(String home) { this.home = home; } @@ -150,7 +148,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft void startNode() { buildNode().start(); NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true); - NodesInfoResponse response = client.execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); + NodesInfoResponse response = client().execute(NodesInfoAction.INSTANCE, nodesInfoRequest).actionGet(); Object obj = response.iterator().next().getTransport().getAddress().publishAddress(); if (obj instanceof InetSocketTransportAddress) { InetSocketTransportAddress address = (InetSocketTransportAddress) obj; @@ -175,16 +173,15 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .put(getNodeSettings()) .put("node.name", id) .build(); - node = new MockNode(nodeSettings); - client = (AbstractClient) node.client(); + this.node = new MockNode(nodeSettings); return node; } + ElasticsearchClient client() { + return node.client(); + } + void closeNodes() { - if (client != null) { - logger.info("closing client"); - client.close(); - } if (node != null) { logger.info("closing all nodes"); node.close(); @@ -193,7 +190,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft void greenHealth() throws IOException { try { - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, + ClusterHealthResponse healthResponse = client().execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN) .timeout(TimeValue.timeValueSeconds(30))).actionGet(); if (healthResponse != null && healthResponse.isTimedOut()) { @@ -208,7 +205,7 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft String clusterName() { ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); ClusterStateResponse clusterStateResponse = - client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); + client().execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); return clusterStateResponse.getClusterName().value(); } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/NetworkUtils.java b/elx-transport/src/main/java/org/xbib/elx/transport/NetworkUtils.java index 35f85ac..0ec35f7 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/NetworkUtils.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/NetworkUtils.java @@ -16,9 +16,6 @@ import java.util.Enumeration; import java.util.List; import java.util.Locale; -/** - * - */ public class NetworkUtils { private static final Logger logger = LogManager.getLogger(NetworkUtils.class.getName()); diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java index 9b4a158..08b3cbc 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportAdminClient.java @@ -31,7 +31,7 @@ public class TransportAdminClient extends AbstractAdminClient { } @Override - public void closeClient(Settings settings) throws IOException { + public void closeClient(Settings settings) { helper.closeClient(settings); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java index 8441393..b591fc2 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportBulkClient.java @@ -30,7 +30,7 @@ public class TransportBulkClient extends AbstractBulkClient { } @Override - public void closeClient(Settings settings) throws IOException { + public void closeClient(Settings settings) { helper.closeClient(settings); } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java index c3ce202..53bc8a7 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportSearchClient.java @@ -30,7 +30,7 @@ public class TransportSearchClient extends AbstractSearchClient { } @Override - public void closeClient(Settings settings) throws IOException { + public void closeClient(Settings settings) { helper.closeClient(settings); } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index e5235aa..76fd9a3 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -2,15 +2,11 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.transport.TransportAdminClient; -import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; @@ -21,7 +17,6 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class BulkClientTest { @@ -36,25 +31,6 @@ class BulkClientTest { this.helper = helper; } - @Test - void testSingleDoc() throws Exception { - try (TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) - .build()) { - IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - bulkClient.newIndex(indexDefinition); - bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); - } - assertNull(bulkClient.getBulkController().getLastBulkError()); - } - } - @Test void testNewIndex() throws Exception { try (TransportBulkClient bulkClient = ClientBuilder.builder() @@ -67,29 +43,20 @@ class BulkClientTest { } @Test - void testMapping() throws Exception { - try (TransportAdminClient adminClient = ClientBuilder.builder() - .setAdminClientProvider(TransportAdminClientProvider.class) + void testSingleDoc() throws Exception { + try (TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .build(); - TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) - .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("doc") - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject() - .endObject(); + .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); - indexDefinition.setMappings(builder.string()); bulkClient.newIndex(indexDefinition); - assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + assertEquals(1, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); + } + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } @@ -106,24 +73,22 @@ class BulkClientTest { bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } } @Test - void testThreadedRandomDocs() { - int maxthreads = Runtime.getRuntime().availableProcessors(); - //long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; + void testThreadedRandomDocs() throws Exception { + final int maxthreads = Runtime.getRuntime().availableProcessors(); final long actions = ACTIONS; - long timeout = 120L; + final long timeout = 120L; try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) @@ -153,15 +118,13 @@ class BulkClientTest { logger.error("latch timeout!"); } bulkClient.stopBulk(indexDefinition); - assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + assertEquals(maxthreads * actions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); bulkClient.refreshIndex(indexDefinition); assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); - } catch (Exception e) { - logger.error(e.getMessage(), e); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index d7a3e5e..437850c 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -7,7 +7,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; import java.util.concurrent.TimeUnit; @@ -22,8 +21,6 @@ class DuplicateIDTest { private static final Long ACTIONS = 100L; - private static final Long MAX_ACTIONS_PER_REQUEST = 5L; - private final TestExtension.Helper helper; DuplicateIDTest(TestExtension.Helper helper) { @@ -36,7 +33,6 @@ class DuplicateIDTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); bulkClient.newIndex(indexDefinition); @@ -44,15 +40,14 @@ class DuplicateIDTest { bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); bulkClient.refreshIndex(indexDefinition); assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index aa9bc61..ea509af 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -47,6 +47,7 @@ class IndexPruneTest { .put(helper.getTransportSettings()) .build()) { IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setIndex("test_prune"); indexDefinition.setFullIndexName("test_prune1"); bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); @@ -64,10 +65,6 @@ class IndexPruneTest { indexDefinition.setShift(true); adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); IndexRetention indexRetention = new DefaultIndexRetention(); - indexRetention.setDelta(2); - indexRetention.setMinToKeep(2); - indexDefinition.setIndex("test_prune"); - indexDefinition.setFullIndexName("test_prune4"); indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); indexDefinition.setPrune(true); @@ -88,10 +85,10 @@ class IndexPruneTest { assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index db0f692..8f1f4c1 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -53,7 +53,6 @@ class IndexShiftTest { bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); indexDefinition.setShift(true); IndexShiftResult indexShiftResult = @@ -62,27 +61,26 @@ class IndexShiftTest { assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().isEmpty()); - Map aliases = adminClient.getAliases("test_shift"); + Map aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test")); - Optional resolved = adminClient.resolveAlias("test").stream().findFirst(); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); + Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test")); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); indexDefinition.setFullIndexName("test_shift2"); bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - indexDefinition.setFullIndexName("test_shift2"); + indexDefinition.setShift(true); indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, index, alias).filter(QueryBuilders.termQuery("my_key", alias))) @@ -93,14 +91,14 @@ class IndexShiftTest { assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("c")); - aliases = adminClient.getAliases("test_shift2"); + aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test").stream().findFirst(); + resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); @@ -108,10 +106,10 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index 9d36609..3d254f6 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -10,7 +10,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.DefaultIndexDefinition; -import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportSearchClient; @@ -31,8 +30,6 @@ class SearchTest { private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 100L; - private final TestExtension.Helper helper; SearchTest(TestExtension.Helper helper) { @@ -46,7 +43,6 @@ class SearchTest { try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { bulkClient.newIndex(indexDefinition); bulkClient.startBulk(indexDefinition); @@ -58,11 +54,11 @@ class SearchTest { assertTrue(bulkClient.waitForResponses(30L, TimeUnit.SECONDS)); bulkClient.refreshIndex(indexDefinition); assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); } try (TransportSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(TransportSearchClientProvider.class) @@ -72,7 +68,7 @@ class SearchTest { Stream stream = searchClient.search(qb -> qb .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), - TimeValue.timeValueMillis(100), 570); + TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); @@ -94,13 +90,13 @@ class SearchTest { .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery())); final AtomicInteger idcount = new AtomicInteger(); - ids.forEach(id -> { - idcount.incrementAndGet(); - }); + ids.forEach(id -> idcount.incrementAndGet()); assertEquals(numactions, idcount.get()); + assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); - assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 93813a5..bb917be 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -3,6 +3,8 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; @@ -41,9 +43,9 @@ class SmokeTest { .put(helper.getTransportSettings()) .build()) { IndexDefinition indexDefinition = - new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); - assertEquals("test_smoke", indexDefinition.getIndex()); - assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); + new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); + assertEquals("test", indexDefinition.getIndex()); + assertTrue(indexDefinition.getFullIndexName().startsWith("test")); assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); bulkClient.newIndex(indexDefinition); @@ -63,13 +65,26 @@ class SmokeTest { adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - assertEquals(0, bulkClient.getBulkController().getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); + assertEquals(0, bulkClient.getBulkProcessor().getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkProcessor().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkProcessor().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkProcessor().getLastBulkError()); } - assertNull(bulkClient.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkProcessor().getLastBulkError()); adminClient.deleteIndex(indexDefinition); + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + indexDefinition.setMappings(builder.string()); + bulkClient.newIndex(indexDefinition); + assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); } } }