From 794446534ce6bc337a1283e8341736b81826ed47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Fri, 22 May 2020 16:38:23 +0200 Subject: [PATCH] adjust to elx API update on 6.3 branch --- .../java/org/xbib/elx/api/AdminClient.java | 15 +- .../java/org/xbib/elx/api/BulkMetric.java | 3 + .../java/org/xbib/elx/api/NativeClient.java | 2 +- .../org/xbib/elx/api/ReadClientProvider.java | 6 - .../{ReadClient.java => SearchClient.java} | 0 .../xbib/elx/common/AbstractAdminClient.java | 500 +++--------------- .../xbib/elx/common/AbstractBulkClient.java | 32 +- .../xbib/elx/common/AbstractNativeClient.java | 35 +- .../elx/common/DefaultBulkController.java | 149 +----- .../xbib/elx/common/DefaultBulkListener.java | 2 +- .../xbib/elx/common/DefaultBulkMetric.java | 5 + .../xbib/elx/common/DefaultBulkProcessor.java | 2 +- .../org/xbib/elx/common/MockAdminClient.java | 4 +- .../org/xbib/elx/common/MockBulkClient.java | 4 + .../org/xbib/elx/common/MockSearchClient.java | 6 + .../org/xbib/elx/node/NodeBulkClient.java | 8 +- .../org/xbib/elx/node/NodeSearchClient.java | 8 +- .../xbib/elx/node/test/BulkClientTest.java | 35 +- .../xbib/elx/node/test/DuplicateIDTest.java | 35 +- .../xbib/elx/node/test/IndexPruneTest.java | 18 +- .../xbib/elx/node/test/IndexShiftTest.java | 51 +- .../org/xbib/elx/node/test/SearchTest.java | 80 +++ .../org/xbib/elx/node/test/SmokeTest.java | 68 +-- .../elx/transport/TransportClientHelper.java | 92 +--- .../elx/transport/test/BulkClientTest.java | 37 +- .../elx/transport/test/DuplicateIDTest.java | 52 +- .../elx/transport/test/IndexPruneTest.java | 20 +- .../elx/transport/test/IndexShiftTest.java | 49 +- .../xbib/elx/transport/test/SearchTest.java | 84 +++ .../xbib/elx/transport/test/SmokeTest.java | 68 ++- .../resources/{log4j2.xml => log4j2-test.xml} | 0 31 files changed, 540 insertions(+), 930 deletions(-) delete mode 100644 elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java rename elx-api/src/main/java/org/xbib/elx/api/{ReadClient.java => SearchClient.java} (100%) create mode 100644 elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java create mode 100644 elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java rename elx-transport/src/test/resources/{log4j2.xml => log4j2-test.xml} (100%) diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index bdb5b5d..fdfcb89 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -22,6 +22,10 @@ public interface AdminClient extends NativeClient { */ IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException; + Map getMapping(String index) throws IOException; + + Map getMapping(String index, String type) throws IOException; + /** * Delete an index. * @param indexDefinition the index definition @@ -89,17 +93,6 @@ public interface AdminClient extends NativeClient { */ boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit); - - /** - * Wait for index recovery (after replica change). - * - * @param index index - * @param maxWaitTime maximum wait time - * @param timeUnit time unit - * @return true if wait succeeded, false if wait timed out - */ - boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit); - /** * Resolve alias. * diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index 7e84376..af825e5 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -1,5 +1,6 @@ package org.xbib.elx.api; +import org.elasticsearch.common.settings.Settings; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -7,6 +8,8 @@ import java.io.Closeable; public interface BulkMetric extends Closeable { + void init(Settings settings); + Metered getTotalIngest(); Count getTotalIngestSizeInBytes(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java b/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java index 52fbbf3..243be58 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/NativeClient.java @@ -55,7 +55,7 @@ public interface NativeClient extends Closeable { */ void waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit); - Map getMapping(String index, String mapping); + void waitForShards(long maxWaitTime, TimeUnit timeUnit); long getSearchableDocs(String index); diff --git a/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java b/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java deleted file mode 100644 index bc0eb16..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/ReadClientProvider.java +++ /dev/null @@ -1,6 +0,0 @@ -package org.xbib.elx.api; - -public interface ReadClientProvider { - - C getReadClient(); -} diff --git a/elx-api/src/main/java/org/xbib/elx/api/ReadClient.java b/elx-api/src/main/java/org/xbib/elx/api/SearchClient.java similarity index 100% rename from elx-api/src/main/java/org/xbib/elx/api/ReadClient.java rename to elx-api/src/main/java/org/xbib/elx/api/SearchClient.java diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 63923f8..b97cc8e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -4,9 +4,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -26,22 +23,24 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.DeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -53,9 +52,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkMetric; -import org.xbib.elx.api.ExtendedClient; import org.xbib.elx.api.IndexAliasAdder; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexPruneResult; @@ -71,19 +70,11 @@ import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -92,7 +83,7 @@ import java.util.stream.Collectors; public abstract class AbstractAdminClient extends AbstractNativeClient implements AdminClient { - private static final Logger logger = LogManager.getLogger(AbstractExtendedClient.class.getName()); + private static final Logger logger = LogManager.getLogger(AbstractAdminClient.class.getName()); /** * The one and only index type name used in the extended client. @@ -100,17 +91,6 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement */ private static final String TYPE_NAME = "doc"; - /** - * The Elasticsearch client. - */ - private ElasticsearchClient client; - - private BulkMetric bulkMetric; - - private BulkController bulkController; - - private AtomicBoolean closed; - private static final IndexShiftResult EMPTY_INDEX_SHIFT_RESULT = new IndexShiftResult() { @Override public List getMovedAliases() { @@ -146,348 +126,53 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement }; @Override - public AbstractExtendedClient setClient(ElasticsearchClient client) { - this.client = client; - return this; + public Map getMapping(String index) throws IOException { + return getMapping(index, TYPE_NAME); } @Override - public ElasticsearchClient getClient() { - return client; + public Map getMapping(String index, String mapping) throws IOException { + GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) + .setIndices(index) + .setTypes(mapping); + GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); + logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap()); + return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap(); } @Override - public BulkMetric getBulkMetric() { - return bulkMetric; - } - - @Override - public BulkController getBulkController() { - return bulkController; - } - - @Override - public AbstractExtendedClient init(Settings settings) throws IOException { - logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); - if (client == null) { - client = createClient(settings); - } - if (bulkMetric == null) { - bulkMetric = new DefaultBulkMetric(); - bulkMetric.init(settings); - } - if (bulkController == null) { - bulkController = new DefaultBulkController(this, bulkMetric); - bulkController.init(settings); - } - return this; - } - - @Override - public void flush() throws IOException { - if (bulkController != null) { - bulkController.flush(); - } - } - - @Override - public void close() throws IOException { - ensureActive(); - if (closed.compareAndSet(false, true)) { - if (bulkMetric != null) { - logger.info("closing bulk metric"); - bulkMetric.close(); - } - if (bulkController != null) { - logger.info("closing bulk controller"); - bulkController.close(); - } - closeClient(); - } - } - - @Override - public String getClusterName() { - ensureActive(); - try { - ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear(); - ClusterStateResponse clusterStateResponse = - client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); - return clusterStateResponse.getClusterName().value(); - } catch (ElasticsearchTimeoutException e) { - logger.warn(e.getMessage(), e); - return "TIMEOUT"; - } catch (NoNodeAvailableException e) { - logger.warn(e.getMessage(), e); - return "DISCONNECTED"; - } catch (Exception e) { - logger.warn(e.getMessage(), e); - return "[" + e.getMessage() + "]"; - } - } - - @Override - public ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException { - ensureActive(); - waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); - URL indexSettings = indexDefinition.getSettingsUrl(); - if (indexSettings == null) { - logger.warn("warning while creating index '{}', no settings/mappings", - indexDefinition.getFullIndexName()); - newIndex(indexDefinition.getFullIndexName()); - return this; - } - URL indexMappings = indexDefinition.getMappingsUrl(); - if (indexMappings == null) { - logger.warn("warning while creating index '{}', no mappings", - indexDefinition.getFullIndexName()); - newIndex(indexDefinition.getFullIndexName(), indexSettings.openStream(), null); - return this; - } - try (InputStream indexSettingsInput = indexSettings.openStream(); - InputStream indexMappingsInput = indexMappings.openStream()) { - newIndex(indexDefinition.getFullIndexName(), indexSettingsInput, indexMappingsInput); - } catch (IOException e) { - if (indexDefinition.ignoreErrors()) { - logger.warn(e.getMessage(), e); - logger.warn("warning while creating index '{}' with settings at {} and mappings at {}", - indexDefinition.getFullIndexName(), indexSettings, indexMappings); - } else { - logger.error("error while creating index '{}' with settings at {} and mappings at {}", - indexDefinition.getFullIndexName(), indexSettings, indexMappings); - throw new IOException(e); - } - } - return this; - } - - @Override - public ExtendedClient newIndex(String index) throws IOException { - return newIndex(index, Settings.EMPTY, (Map) null); - } - - @Override - public ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException { - return newIndex(index, - Settings.builder().loadFromStream(".json", settings, true).build(), - mapping != null ? JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mapping).mapOrdered() : null); - } - - @Override - public ExtendedClient newIndex(String index, Settings settings) throws IOException { - return newIndex(index, settings, (Map) null); - } - - @Override - public ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException { - return newIndex(index, settings, - mapping != null ? JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mapping).mapOrdered() : null); - } - - @Override - public ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException { - ensureActive(); - if (index == null) { - logger.warn("no index name given to create index"); - return this; - } - CreateIndexRequest createIndexRequest = new CreateIndexRequest().index(index); - if (settings != null) { - createIndexRequest.settings(settings); - } - if (mapping != null) { - createIndexRequest.mapping(TYPE_NAME, mapping); - } - CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); - XContentBuilder builder = XContentFactory.jsonBuilder(); - logger.info("index {} created: {}", index, - Strings.toString(createIndexResponse.toXContent(builder, ToXContent.EMPTY_PARAMS))); - return this; - } - - @Override - public ExtendedClient deleteIndex(IndexDefinition indexDefinition) { + public AdminClient deleteIndex(IndexDefinition indexDefinition) { return deleteIndex(indexDefinition.getFullIndexName()); } @Override - public ExtendedClient deleteIndex(String index) { - ensureActive(); + public AdminClient deleteIndex(String index) { + ensureClientIsPresent(); if (index == null) { logger.warn("no index name given to delete index"); return this; } DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); + waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); + waitForShards(30L, TimeUnit.SECONDS); return this; } @Override - public ExtendedClient startBulk(IndexDefinition indexDefinition) throws IOException { - startBulk(indexDefinition.getFullIndexName(), -1, 1); - return this; - } - - @Override - public ExtendedClient startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) - throws IOException { - if (bulkController != null) { - ensureActive(); - bulkController.startBulkMode(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds); - } - return this; - } - - @Override - public ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException { - if (bulkController != null) { - ensureActive(); - bulkController.stopBulkMode(indexDefinition); - } - return this; - } - - @Override - public ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException { - if (bulkController != null) { - ensureActive(); - bulkController.stopBulkMode(index, timeout, timeUnit); - } - return this; - } - - @Override - public ExtendedClient index(String index, String id, boolean create, String source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create) - .source(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON)); - } - - @Override - public ExtendedClient index(String index, String id, boolean create, BytesReference source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create) - .source(source, XContentType.JSON)); - } - - @Override - public ExtendedClient index(IndexRequest indexRequest) { - ensureActive(); - bulkController.index(indexRequest); - return this; - } - - @Override - public ExtendedClient delete(String index, String id) { - return delete(new DeleteRequest(index, TYPE_NAME, id)); - } - - @Override - public ExtendedClient delete(DeleteRequest deleteRequest) { - ensureActive(); - bulkController.delete(deleteRequest); - return this; - } - - @Override - public ExtendedClient update(String index, String id, BytesReference source) { - return update(new UpdateRequest(index, TYPE_NAME, id) - .doc(source, XContentType.JSON)); - } - - @Override - public ExtendedClient update(String index, String id, String source) { - return update(new UpdateRequest(index, TYPE_NAME, id) - .doc(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON)); - } - - @Override - public ExtendedClient update(UpdateRequest updateRequest) { - ensureActive(); - bulkController.update(updateRequest); - return this; - } - - @Override - public boolean waitForResponses(long timeout, TimeUnit timeUnit) { - ensureActive(); - return bulkController.waitForResponses(timeout, timeUnit); - } - - @Override - public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) { - ensureActive(); - ensureIndexGiven(index); - GetSettingsRequest settingsRequest = new GetSettingsRequest(); - settingsRequest.indices(index); - GetSettingsResponse settingsResponse = client.execute(GetSettingsAction.INSTANCE, settingsRequest).actionGet(); - int shards = settingsResponse.getIndexToSettings().get(index).getAsInt("index.number_of_shards", -1); - if (shards > 0) { - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() - .indices(index) - .waitForActiveShards(shards) - .timeout(timeout); - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - logger.error("timeout waiting for recovery"); - return false; - } - } - return true; - } - - @Override - public boolean waitForCluster(String statusString, long maxWaitTime, TimeUnit timeUnit) { - ensureActive(); - ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString); - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().timeout(timeout).waitForStatus(status)).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - if (logger.isErrorEnabled()) { - logger.error("timeout, cluster state is " + healthResponse.getStatus().name() + " and not " + status.name()); - } - return false; - } - return true; - } - - @Override - public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { - ensureActive(); - try { - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthResponse healthResponse = client.execute(ClusterHealthAction.INSTANCE, - new ClusterHealthRequest().timeout(timeout)).actionGet(); - ClusterHealthStatus status = healthResponse.getStatus(); - return status.name(); - } catch (ElasticsearchTimeoutException e) { - logger.warn(e.getMessage(), e); - return "TIMEOUT"; - } catch (NoNodeAvailableException e) { - logger.warn(e.getMessage(), e); - return "DISCONNECTED"; - } catch (Exception e) { - logger.warn(e.getMessage(), e); - return "[" + e.getMessage() + "]"; - } - } - - @Override - public ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { + public AdminClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException { return updateReplicaLevel(indexDefinition.getFullIndexName(), level, indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit()); } @Override public AdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException { - waitForCluster("YELLOW", maxWaitTime, timeUnit); // let cluster settle down from critical operations - if (level > 0) { - updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); - waitForRecovery(index, maxWaitTime, timeUnit); + if (level < 1) { + logger.warn("invalid replica level"); + return this; } + updateIndexSetting(index, "number_of_replicas", level, maxWaitTime, timeUnit); + waitForShards(maxWaitTime, timeUnit); return this; } @@ -510,27 +195,9 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement return replica; } - @Override - public ExtendedClient flushIndex(String index) { - if (index != null) { - ensureActive(); - client.execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet(); - } - return this; - } - - @Override - public ExtendedClient refreshIndex(String index) { - if (index != null) { - ensureActive(); - client.execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet(); - } - return this; - } - @Override public String resolveMostRecentIndex(String alias) { - ensureActive(); + ensureClientIsPresent(); if (alias == null) { return null; } @@ -558,7 +225,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement @Override public String resolveAlias(String alias) { - ensureActive(); + ensureClientIsPresent(); ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.blocks(false); clusterStateRequest.metaData(true); @@ -569,7 +236,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement client.execute(ClusterStateAction.INSTANCE, clusterStateRequest).actionGet(); SortedMap map = clusterStateResponse.getState().getMetaData().getAliasAndIndexLookup(); AliasOrIndex aliasOrIndex = map.get(alias); - return aliasOrIndex != null ? aliasOrIndex.getIndices().iterator().next().getIndex().getName() : null; + return aliasOrIndex != null ? aliasOrIndex.getIndices().iterator().next().getIndex() : null; } @Override @@ -600,7 +267,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement @Override public IndexShiftResult shiftIndex(String index, String fullIndexName, List additionalAliases, IndexAliasAdder adder) { - ensureActive(); + ensureClientIsPresent(); if (index == null) { return EMPTY_INDEX_SHIFT_RESULT; // nothing to shift to } @@ -616,8 +283,8 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement final List moveAliases = new ArrayList<>(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); if (oldAliasMap == null || !oldAliasMap.containsKey(index)) { - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(fullIndexName).alias(index)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + fullIndexName, index)); newAliases.add(index); } // move existing aliases @@ -625,14 +292,14 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement for (Map.Entry entry : oldAliasMap.entrySet()) { String alias = entry.getKey(); String filter = entry.getValue(); - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove() - .indices(oldIndex).alias(alias)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.REMOVE, + oldIndex, alias)); if (filter != null) { - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(fullIndexName).alias(alias).filter(filter)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + fullIndexName, alias).filter(filter)); } else { - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(fullIndexName).alias(alias)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + fullIndexName, alias)); } moveAliases.add(alias); } @@ -645,20 +312,20 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement if (adder != null) { adder.addIndexAlias(indicesAliasesRequest, fullIndexName, additionalAlias); } else { - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(fullIndexName).alias(additionalAlias)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + fullIndexName, additionalAlias)); } newAliases.add(additionalAlias); } else { String filter = oldAliasMap.get(additionalAlias); - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove() - .indices(oldIndex).alias(additionalAlias)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.REMOVE, + oldIndex, additionalAlias)); if (filter != null) { - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(fullIndexName).alias(additionalAlias).filter(filter)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + fullIndexName, additionalAlias).filter(filter)); } else { - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(fullIndexName).alias(additionalAlias)); + indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + fullIndexName, additionalAlias)); } moveAliases.add(additionalAlias); } @@ -668,8 +335,8 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement logger.debug("indices alias request = {}", indicesAliasesRequest.getAliasActions().toString()); IndicesAliasesResponse indicesAliasesResponse = client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet(); - logger.debug("response isAcknowledged = {} isFragment = {}", - indicesAliasesResponse.isAcknowledged(), indicesAliasesResponse.isFragment()); + logger.debug("response isAcknowledged = {}", + indicesAliasesResponse.isAcknowledged()); } return new SuccessIndexShiftResult(moveAliases, newAliases); } @@ -688,7 +355,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement if (index.equals(fullIndexName)) { return EMPTY_INDEX_PRUNE_RESULT; } - ensureActive(); + ensureClientIsPresent(); GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client, GetIndexAction.INSTANCE); GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet(); Pattern pattern = Pattern.compile("^(.*?)(\\d+)$"); @@ -733,11 +400,11 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement @Override public Long mostRecentDocument(String index, String timestampfieldname) { - ensureActive(); - SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); + ensureClientIsPresent(); + SortBuilder sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC); SearchSourceBuilder builder = new SearchSourceBuilder(); builder.sort(sort); - builder.storedField(timestampfieldname); + builder.field(timestampfieldname); builder.size(1); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(index); @@ -820,24 +487,10 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement @Override public void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException { - ensureActive(); + ensureClientIsPresent(); if (index == null) { throw new IOException("no index name given"); } - try { - URL url = new URL(string); - try (InputStream inputStream = url.openStream()) { - Settings settings = Settings.builder().loadFromStream(string, inputStream, true).build(); - XContentBuilder builder = JsonXContent.contentBuilder(); - settings.toXContent(builder, ToXContent.EMPTY_PARAMS); - return Strings.toString(builder); - } - } catch (MalformedURLException e) { - return string; - } - if (value == null) { - throw new IOException("no value given"); - } Settings.Builder updateSettingsBuilder = Settings.builder(); updateSettingsBuilder.put(key, value.toString()); UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index) @@ -845,26 +498,41 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement client.execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet(); } - private void ensureActive() { - if (this instanceof MockExtendedClient) { - return; + private static String findSettingsFrom(String string) throws IOException { + if (string == null) { + return null; + } + try { + URL url = new URL(string); + try (InputStream inputStream = url.openStream()) { + Settings settings = Settings.builder().loadFromStream(string, inputStream).build(); + XContentBuilder builder = JsonXContent.contentBuilder(); + settings.toXContent(builder, ToXContent.EMPTY_PARAMS); + return builder.string(); + } + } catch (MalformedURLException e) { + return string; + } + } + + private static String findMappingsFrom(String string) throws IOException { + if (string == null) { + return null; } try { URL url = new URL(string); try (InputStream inputStream = url.openStream()) { if (string.endsWith(".json")) { - Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered(); + Map mappings = JsonXContent.jsonXContent.createParser(inputStream).mapOrdered(); XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject().map(mappings).endObject(); - return Strings.toString(builder); + return builder.string(); } if (string.endsWith(".yml") || string.endsWith(".yaml")) { - Map mappings = YamlXContent.yamlXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered(); + Map mappings = YamlXContent.yamlXContent.createParser(inputStream).mapOrdered(); XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject().map(mappings).endObject(); - return Strings.toString(builder); + return builder.string(); } } return string; @@ -873,12 +541,6 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement } } - private void ensureIndexGiven(String index) { - if (index == null) { - throw new IllegalArgumentException("no index given"); - } - } - private Map getFilters(GetAliasesResponse getAliasesResponse) { Map result = new HashMap<>(); for (ObjectObjectCursor> object : getAliasesResponse.getAliases()) { @@ -896,7 +558,7 @@ public abstract class AbstractAdminClient extends AbstractNativeClient implement } public void checkMapping(String index) { - ensureActive(); + ensureClientIsPresent(); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index); GetMappingsResponse getMappingsResponse = client.execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); ImmutableOpenMap> map = getMappingsResponse.getMappings(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index e4c603d..1b03d02 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -13,12 +13,9 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.DeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -27,7 +24,6 @@ import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.IndexDefinition; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -91,10 +87,9 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public void newIndex(IndexDefinition indexDefinition) throws IOException { Settings settings = indexDefinition.getSettings() == null ? null : - Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build(); + Settings.builder().loadFromSource(indexDefinition.getSettings()).build(); Map mappings = indexDefinition.getMappings() == null ? null : - JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered(); + JsonXContent.jsonXContent.createParser(indexDefinition.getMappings()).mapOrdered(); newIndex(indexDefinition.getFullIndexName(), settings, mappings); } @@ -110,9 +105,8 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public void newIndex(String index, Settings settings, XContentBuilder builder) throws IOException { - String mappingString = Strings.toString(builder); - Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingString).mapOrdered(); + String mappingString = builder.string(); + Map mappings = JsonXContent.jsonXContent.createParser(mappingString).mapOrdered(); newIndex(index, settings, mappings); } @@ -134,7 +128,7 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements CreateIndexResponse createIndexResponse = client.execute(CreateIndexAction.INSTANCE, createIndexRequest).actionGet(); XContentBuilder builder = XContentFactory.jsonBuilder(); logger.info("index {} created: {}", index, - Strings.toString(createIndexResponse.toXContent(builder, ToXContent.EMPTY_PARAMS))); + createIndexResponse.toString()); } @Override @@ -169,20 +163,18 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public BulkClient index(String index, String id, boolean create, String source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create) - .source(source.getBytes(StandardCharsets.UTF_8), XContentType.JSON)); + return index(index, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); } @Override public BulkClient index(String index, String id, boolean create, BytesReference source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create) - .source(source, XContentType.JSON)); + return index(new IndexRequest(index, TYPE_NAME, id).create(create).source(source)); } @Override public BulkClient index(IndexRequest indexRequest) { ensureClientIsPresent(); - bulkController.index(indexRequest); + bulkController.bulkIndex(indexRequest); return this; } @@ -194,7 +186,7 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public BulkClient delete(DeleteRequest deleteRequest) { ensureClientIsPresent(); - bulkController.delete(deleteRequest); + bulkController.bulkDelete(deleteRequest); return this; } @@ -213,14 +205,14 @@ public abstract class AbstractBulkClient extends AbstractNativeClient implements @Override public BulkClient update(UpdateRequest updateRequest) { ensureClientIsPresent(); - bulkController.update(updateRequest); + bulkController.bulkUpdate(updateRequest); return this; } @Override public boolean waitForResponses(long timeout, TimeUnit timeUnit) { ensureClientIsPresent(); - return bulkController.waitForResponses(timeout, timeUnit); + return bulkController.waitForBulkResponses(timeout, timeUnit); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java index 5c8c599..36c65ab 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractNativeClient.java @@ -13,9 +13,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.search.SearchAction; @@ -28,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.xbib.elx.api.NativeClient; import java.io.IOException; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,7 +34,7 @@ public abstract class AbstractNativeClient implements NativeClient { /** * The one and only index type name used in the extended client. - * Notr that all Elasticsearch version < 6.2.0 do not allow a prepending "_". + * Note that all Elasticsearch versions before 6.2.0 do not allow a prepending "_". */ protected static final String TYPE_NAME = "doc"; @@ -108,6 +104,23 @@ public abstract class AbstractNativeClient implements NativeClient { throw new IllegalStateException(message); } } + @Override + public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { + ensureClientIsPresent(); + logger.info("waiting for cluster shard settling"); + TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() + //.waitForActiveShards(0) + .waitForRelocatingShards(0) + .timeout(timeout); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); + if (healthResponse.isTimedOut()) { + String message = "timeout waiting for cluster shards"; + logger.error(message); + throw new IllegalStateException(message); + } + } @Override public String getHealthColor(long maxWaitTime, TimeUnit timeUnit) { @@ -130,16 +143,6 @@ public abstract class AbstractNativeClient implements NativeClient { } } - @Override - public Map getMapping(String index, String mapping) { - GetMappingsRequestBuilder getMappingsRequestBuilder = new GetMappingsRequestBuilder(client, GetMappingsAction.INSTANCE) - .setIndices(index) - .setTypes(mapping); - GetMappingsResponse getMappingsResponse = getMappingsRequestBuilder.execute().actionGet(); - logger.info("get mappings response = {}", getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap()); - return getMappingsResponse.getMappings().get(index).get(mapping).getSourceAsMap(); - } - @Override public long getSearchableDocs(String index) { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) @@ -152,7 +155,7 @@ public abstract class AbstractNativeClient implements NativeClient { @Override public boolean isIndexExists(String index) { IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); - indicesExistsRequest.indices(index); + indicesExistsRequest.indices(new String[]{index}); IndicesExistsResponse indicesExistsResponse = client.execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); return indicesExistsResponse.isExists(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index 67c6984..d2be48a 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -2,9 +2,6 @@ package org.xbib.elx.common; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -13,6 +10,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.xbib.elx.api.BulkClient; import org.xbib.elx.api.BulkController; +import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; import org.xbib.elx.api.BulkProcessor; import org.xbib.elx.api.IndexDefinition; @@ -47,11 +45,9 @@ public class DefaultBulkController implements BulkController { private BulkListener bulkListener; - private AtomicBoolean active; + private final AtomicBoolean active; - private boolean enableBulkLogging; - - public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) { + public DefaultBulkController(BulkClient client, BulkMetric bulkMetric) { this.client = client; this.bulkMetric = bulkMetric; this.indexNames = new ArrayList<>(); @@ -62,6 +58,11 @@ public class DefaultBulkController implements BulkController { this.maxWaitTimeUnit = TimeUnit.SECONDS; } + @Override + public BulkMetric getBulkMetric() { + return bulkMetric; + } + @Override public Throwable getLastBulkError() { return bulkListener.getLastBulkError(); @@ -78,9 +79,9 @@ public class DefaultBulkController implements BulkController { ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(Parameters.MAX_VOLUME_PER_REQUEST.name(), ByteSizeValue.parseBytesSizeValue(Parameters.DEFAULT_MAX_VOLUME_PER_REQUEST.getString(), "maxVolumePerRequest")); - this.enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), + boolean enableBulkLogging = settings.getAsBoolean(Parameters.ENABLE_BULK_LOGGING.name(), Parameters.ENABLE_BULK_LOGGING.getValue()); - this.bulkListener = new BulkListener(); + this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging); this.bulkProcessor = DefaultBulkProcessor.builder(client.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) @@ -96,6 +97,11 @@ public class DefaultBulkController implements BulkController { } } + @Override + public void inactivate() { + this.active.set(false); + } + @Override public void startBulkMode(IndexDefinition indexDefinition) throws IOException { startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(), @@ -118,65 +124,49 @@ public class DefaultBulkController implements BulkController { } @Override - public void index(IndexRequest indexRequest) { + public void bulkIndex(IndexRequest indexRequest) { ensureActiveAndBulk(); - if (!active.get()) { - throw new IllegalStateException("inactive"); - } try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); - } + bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(), indexRequest.id()); bulkProcessor.add(indexRequest); } catch (Exception e) { - bulkListener.lastBulkError = e; - active.set(false); if (logger.isErrorEnabled()) { logger.error("bulk add of index failed: " + e.getMessage(), e); } + inactivate(); } } @Override - public void delete(DeleteRequest deleteRequest) { - if (!active.get()) { - throw new IllegalStateException("inactive"); - } + public void bulkDelete(DeleteRequest deleteRequest) { + ensureActiveAndBulk(); try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); - } + bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); bulkProcessor.add(deleteRequest); } catch (Exception e) { - bulkListener.lastBulkError = e; - active.set(false); if (logger.isErrorEnabled()) { logger.error("bulk add of delete failed: " + e.getMessage(), e); } + inactivate(); } } @Override - public void update(UpdateRequest updateRequest) { - if (!active.get()) { - throw new IllegalStateException("inactive"); - } + public void bulkUpdate(UpdateRequest updateRequest) { + ensureActiveAndBulk(); try { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); - } + bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(), updateRequest.id()); bulkProcessor.add(updateRequest); } catch (Exception e) { - bulkListener.lastBulkError = e; - active.set(false); if (logger.isErrorEnabled()) { logger.error("bulk add of update failed: " + e.getMessage(), e); } + inactivate(); } } @Override - public boolean waitForResponses(long timeout, TimeUnit timeUnit) { + public boolean waitForBulkResponses(long timeout, TimeUnit timeUnit) { try { return bulkProcessor.awaitFlush(timeout, timeUnit); } catch (InterruptedException e) { @@ -195,7 +185,7 @@ public class DefaultBulkController implements BulkController { @Override public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException { flush(); - if (waitForResponses(timeout, timeUnit)) { + if (waitForBulkResponses(timeout, timeUnit)) { if (indexNames.contains(index)) { Long secs = stopBulkRefreshIntervals.get(index); if (secs != null && secs != 0L) { @@ -243,87 +233,4 @@ public class DefaultBulkController implements BulkController { } } - private class BulkListener implements DefaultBulkProcessor.Listener { - - private final Logger logger = LogManager.getLogger(BulkListener.class.getName()); - - private Throwable lastBulkError = null; - - @Override - public void beforeBulk(long executionId, BulkRequest request) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().inc(); - int n = request.numberOfActions(); - bulkMetric.getSubmitted().inc(n); - bulkMetric.getCurrentIngestNumDocs().inc(n); - bulkMetric.getTotalIngestSizeInBytes().inc(request.estimatedSizeInBytes()); - } - if (enableBulkLogging && logger.isDebugEnabled()) { - logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]", - executionId, - request.numberOfActions(), - request.estimatedSizeInBytes(), - l); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - long l = 0; - if (bulkMetric != null) { - l = bulkMetric.getCurrentIngest().getCount(); - bulkMetric.getCurrentIngest().dec(); - bulkMetric.getSucceeded().inc(response.getItems().length); - } - int n = 0; - for (BulkItemResponse itemResponse : response.getItems()) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); - } - if (itemResponse.isFailed()) { - n++; - if (bulkMetric != null) { - bulkMetric.getSucceeded().dec(1); - bulkMetric.getFailed().inc(1); - } - } - } - if (enableBulkLogging && logger.isDebugEnabled() && bulkMetric != null) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", - executionId, - bulkMetric.getSucceeded().getCount(), - bulkMetric.getFailed().getCount(), - response.getTook().millis(), - l); - } - if (n > 0) { - if (enableBulkLogging && logger.isErrorEnabled()) { - logger.error("bulk [{}] failed with {} failed items, failure message = {}", - executionId, n, response.buildFailureMessage()); - } - } else { - if (bulkMetric != null) { - bulkMetric.getCurrentIngestNumDocs().dec(response.getItems().length); - } - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - if (bulkMetric != null) { - bulkMetric.getCurrentIngest().dec(); - } - lastBulkError = failure; - active.set(false); - if (enableBulkLogging && logger.isErrorEnabled()) { - logger.error("after bulk [" + executionId + "] error", failure); - } - } - - Throwable getLastBulkError() { - return lastBulkError; - } - } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index bb40ba5..49a54bf 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -9,7 +9,7 @@ import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; -class DefaultBulkListener implements BulkListener { +public class DefaultBulkListener implements BulkListener { private final Logger logger = LogManager.getLogger(BulkListener.class.getName()); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index 1350e65..8127e29 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -1,5 +1,6 @@ package org.xbib.elx.common; +import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.BulkMetric; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -36,6 +37,10 @@ public class DefaultBulkMetric implements BulkMetric { submitted = new CountMetric(); succeeded = new CountMetric(); failed = new CountMetric(); + } + + @Override + public void init(Settings settings) { start(); } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index e815b83..1de6169 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -388,7 +388,7 @@ public class DefaultBulkProcessor implements BulkProcessor { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); acquired = true; - client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener() { + client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { @Override public void onResponse(BulkResponse response) { try { diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index a63c989..bbded5e 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -43,8 +43,8 @@ public class MockAdminClient extends AbstractAdminClient { } @Override - public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) { - return true; + public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { + } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index 7c78f83..6e7e10a 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -26,6 +26,10 @@ public class MockBulkClient extends AbstractBulkClient { return null; } + @Override + public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { + } + @Override protected ElasticsearchClient createClient(Settings settings) { return null; diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java index 07c0091..d8d56cc 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockSearchClient.java @@ -3,6 +3,8 @@ package org.xbib.elx.common; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; +import java.util.concurrent.TimeUnit; + /** * A mocked client, it does not perform any actions on a cluster. Useful for testing. */ @@ -22,6 +24,10 @@ public class MockSearchClient extends AbstractSearchClient { return null; } + @Override + public void waitForShards(long maxWaitTime, TimeUnit timeUnit) { + } + @Override protected ElasticsearchClient createClient(Settings settings) { return null; diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java index d469609..6b01fcc 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeBulkClient.java @@ -2,8 +2,8 @@ package org.xbib.elx.node; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -40,9 +40,9 @@ public class NodeBulkClient extends AbstractBulkClient { XContentBuilder builder = XContentFactory.jsonBuilder(); effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true"))); logger.info("creating node client on {} with effective settings {}", - version, Strings.toString(builder)); + version, builder.string()); Collection> plugins = Collections.emptyList(); - this.node = new BulkNode(new Environment(effectiveSettings, null), plugins); + this.node = new BulkNode(new Environment(effectiveSettings), plugins); try { node.start(); } catch (Exception e) { @@ -62,7 +62,7 @@ public class NodeBulkClient extends AbstractBulkClient { private static class BulkNode extends Node { BulkNode(Environment env, Collection> classpathPlugins) { - super(env, classpathPlugins); + super(env, Version.CURRENT, classpathPlugins); } } } diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java index ab7f35d..e6fa89a 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeSearchClient.java @@ -2,8 +2,8 @@ package org.xbib.elx.node; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -40,9 +40,9 @@ public class NodeSearchClient extends AbstractSearchClient { XContentBuilder builder = XContentFactory.jsonBuilder(); effectiveSettings.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true"))); logger.info("creating node client on {} with effective settings {}", - version, Strings.toString(builder)); + version, builder.string()); Collection> plugins = Collections.emptyList(); - this.node = new BulkNode(new Environment(effectiveSettings, null), plugins); + this.node = new BulkNode(new Environment(effectiveSettings), plugins); try { node.start(); } catch (Exception e) { @@ -62,7 +62,7 @@ public class NodeSearchClient extends AbstractSearchClient { private static class BulkNode extends Node { BulkNode(Environment env, Collection> classpathPlugins) { - super(env, classpathPlugins); + super(env, Version.CURRENT, classpathPlugins); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 16b2c69..d98e45d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import org.xbib.elx.node.NodeAdminClient; +import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -71,22 +73,25 @@ class BulkClientTest { @Test void testMapping() throws Exception { - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) - .setBulkClientProvider(NodeBulkClientProvider.class) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + .setAdminClientProvider(NodeAdminClientProvider.class) .build(); - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("doc") - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject() - .endObject(); - bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties")); - bulkClient.close(); + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + .setBulkClientProvider(NodeBulkClientProvider.class) + .build()) { + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + bulkClient.newIndex("test", Settings.EMPTY, builder); + assertTrue(adminClient.getMapping("test", "doc").containsKey("properties")); + } } @Test diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index f7e35a2..6751350 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -33,35 +33,22 @@ class DuplicateIDTest { @Test void testDuplicateDocIDs() throws Exception { long numactions = ACTIONS; - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) .setBulkClientProvider(NodeBulkClientProvider.class) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .build(); - try { - client.newIndex("test"); + .build()) { + bulkClient.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { - client.index("test", helper.randomString(1), false, + bulkClient.index("test", helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - client.flush(); - client.waitForResponses(30L, TimeUnit.SECONDS); - client.refreshIndex("test"); - SearchSourceBuilder builder = new SearchSourceBuilder(); - builder.query(QueryBuilders.matchAllQuery()); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices("test"); - searchRequest.types("test"); - searchRequest.source(builder); - long hits = helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet().getHits().getTotalHits(); - logger.info("hits = {}", hits); - assertTrue(hits < ACTIONS); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } finally { - client.close(); - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); - if (client.getBulkController().getLastBulkError() != null) { - logger.error("error", client.getBulkController().getLastBulkError()); + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); + assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index f121209..cf4821e 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -11,13 +11,11 @@ import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; - import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -35,13 +33,12 @@ class IndexPruneTest { @Test void testPrune() throws IOException { - final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) .setAdminClientProvider(NodeAdminClientProvider.class) .build(); - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) .setBulkClientProvider(NodeBulkClientProvider.class) - .build(); - try { + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -62,20 +59,13 @@ class IndexPruneTest { assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); - indicesExistsRequest.indices(index); - IndicesExistsResponse indicesExistsResponse = - client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); - list.add(indicesExistsResponse.isExists()); + list.add(adminClient.isIndexExists(index)); } logger.info(list); assertFalse(list.get(0)); assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - } finally { - adminClient.close(); - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 0fdec74..0b1dddb 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -3,6 +3,7 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; @@ -13,11 +14,9 @@ import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; - import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; - import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -34,55 +33,50 @@ class IndexShiftTest { @Test void testIndexShift() throws Exception { - final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) .setAdminClientProvider(NodeAdminClientProvider.class) .build(); - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) .setBulkClientProvider(NodeBulkClientProvider.class) - .build(); - try { + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) .build(); - client.newIndex("test_shift", settings); + bulkClient.newIndex("test_shift", settings); for (int i = 0; i < 1; i++) { - client.index("test_shift", helper.randomString(1), false, + bulkClient.index("test_shift", helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); IndexShiftResult indexShiftResult = - client.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c")); + adminClient.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c")); assertTrue(indexShiftResult.getNewAliases().contains("a")); assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().isEmpty()); - - Map aliases = client.getAliases("test_shift"); + Map aliases = adminClient.getAliases("test_shift"); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("test")); - - String resolved = client.resolveAlias("test"); - aliases = client.getAliases(resolved); + String resolved = adminClient.resolveAlias("test"); + aliases = adminClient.getAliases(resolved); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("test")); - - client.newIndex("test_shift2", settings); + bulkClient.newIndex("test_shift2", settings); for (int i = 0; i < 1; i++) { - client.index("test_shift2", helper.randomString(1), false, + bulkClient.index("test_shift2", helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - client.flush(); - client.waitForResponses(30L, TimeUnit.SECONDS); - - indexShiftResult = client.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"), - (request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(index).alias(alias).filter(QueryBuilders.termQuery("my_key", alias))) + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + indexShiftResult = adminClient.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"), + (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + index, alias).filter(QueryBuilders.termQuery("my_key", alias))) ); assertTrue(indexShiftResult.getNewAliases().contains("d")); assertTrue(indexShiftResult.getNewAliases().contains("e")); @@ -90,26 +84,21 @@ class IndexShiftTest { assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("c")); - - aliases = client.getAliases("test_shift2"); + aliases = adminClient.getAliases("test_shift2"); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - - resolved = client.resolveAlias("test"); - aliases = client.getAliases(resolved); + resolved = adminClient.resolveAlias("test"); + aliases = adminClient.getAliases(resolved); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - } finally { - adminClient.close(); - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java new file mode 100644 index 0000000..2fa2f0b --- /dev/null +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -0,0 +1,80 @@ +package org.xbib.elx.node.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.Parameters; +import org.xbib.elx.node.NodeBulkClient; +import org.xbib.elx.node.NodeBulkClientProvider; +import org.xbib.elx.node.NodeSearchClient; +import org.xbib.elx.node.NodeSearchClientProvider; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +@ExtendWith(TestExtension.class) +class SearchTest { + + private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); + + private static final Long ACTIONS = 100L; + + private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + + private final TestExtension.Helper helper; + + SearchTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testDocStream() throws Exception { + long numactions = ACTIONS; + final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + .setBulkClientProvider(NodeBulkClientProvider.class) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .build(); + try (bulkClient) { + bulkClient.newIndex("test"); + for (int i = 0; i < ACTIONS; i++) { + bulkClient.index("test", null, false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertEquals(numactions, bulkClient.getSearchableDocs("test")); + } + assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); + try (NodeSearchClient searchClient = ClientBuilder.builder(helper.client("1")) + .setSearchClientProvider(NodeSearchClientProvider.class) + .build()) { + Stream stream = searchClient.search(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery()), + TimeValue.timeValueMinutes(1), 10); + long count = stream.count(); + assertEquals(numactions, count); + Stream ids = searchClient.getIds(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery())); + final AtomicInteger idcount = new AtomicInteger(); + ids.forEach(id -> { + logger.info(id); + idcount.incrementAndGet(); + }); + assertEquals(numactions, idcount.get()); + } + } +} diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 76593c6..2a8892d 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -30,56 +30,44 @@ class SmokeTest { @Test void smokeTest() throws Exception { - final NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) + try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client("1")) .setAdminClientProvider(NodeAdminClientProvider.class) .build(); - final NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) + NodeBulkClient bulkClient = ClientBuilder.builder(helper.client("1")) .setBulkClientProvider(NodeBulkClientProvider.class) - .build(); - IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); - try { - assertEquals(helper.getClusterName(), client.getClusterName()); - client.newIndex("test_smoke"); - client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - client.flush(); - client.waitForResponses(30, TimeUnit.SECONDS); - client.checkMapping("test_smoke"); - client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); - client.delete("test_smoke", "1"); - client.flush(); - client.waitForResponses(30, TimeUnit.SECONDS); - client.waitForRecovery("test_smoke", 10L, TimeUnit.SECONDS); - client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); - client.delete("test_smoke", "1"); - client.flush(); - client.deleteIndex("test_smoke"); - IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke", Settings.builder() - .build()); + .build()) { + IndexDefinition indexDefinition = + adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); assertEquals(0, indexDefinition.getReplicaLevel()); - client.newIndex(indexDefinition); - client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); - client.flush(); - client.waitForResponses(30, TimeUnit.SECONDS); - client.updateReplicaLevel(indexDefinition, 2); - int replica = client.getReplicaLevel(indexDefinition); + assertEquals(helper.getClusterName(), adminClient.getClusterName()); + bulkClient.newIndex("test_smoke"); + bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.checkMapping("test_smoke"); + bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); + bulkClient.delete("test_smoke", "1"); + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete("test_smoke", "1"); + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.deleteIndex("test_smoke"); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.updateReplicaLevel(indexDefinition, 2); + int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - client.deleteIndex(indexDefinition); - //assertEquals(0, client.getBulkMetric().getFailed().getCount()); - //assertEquals(6, client.getBulkMetric().getSucceeded().getCount()); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } finally { - bulkClient.close(); + assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); + assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } - assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); assertNull(bulkClient.getBulkController().getLastBulkError()); - // close admin after bulk adminClient.deleteIndex(indexDefinition); - adminClient.close(); } } } diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index 4ac9ff2..53147c2 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -1,6 +1,5 @@ package org.xbib.elx.transport; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; @@ -10,48 +9,47 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.transport.netty4.Netty4Plugin; +import org.jboss.netty.channel.DefaultChannelFuture; import org.xbib.elx.common.util.NetworkUtils; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; public class TransportClientHelper { - private static final Logger logger = LogManager.getLogger(TransportAdminClient.class.getName()); - protected ElasticsearchClient createClient(Settings settings) throws IOException { + protected ElasticsearchClient createClient(Settings settings) { if (settings != null) { String systemIdentifier = System.getProperty("os.name") + " " + System.getProperty("java.vm.name") + " " + System.getProperty("java.vm.vendor") + " " + System.getProperty("java.vm.version") + " Elasticsearch " + Version.CURRENT.toString(); - Settings transportClientSettings = getTransportClientSettings(settings); - //XContentBuilder settingsBuilder = XContentFactory.jsonBuilder().startObject(); - XContentBuilder effectiveSettingsBuilder = XContentFactory.jsonBuilder().startObject(); - logger.log(Level.INFO, "creating transport client on {} with settings {}", - systemIdentifier, - //Strings.toString(settings.toXContent(settingsBuilder, ToXContent.EMPTY_PARAMS).endObject()), - Strings.toString(transportClientSettings.toXContent(effectiveSettingsBuilder, - ToXContent.EMPTY_PARAMS).endObject())); - return new MyTransportClient(transportClientSettings, Collections.singletonList(Netty4Plugin.class)); + Settings effectiveSettings = Settings.builder() + // for thread pool size + .put("processors", + settings.getAsInt("processors", Runtime.getRuntime().availableProcessors())) + .put("client.transport.sniff", false) // do not sniff + .put("client.transport.nodes_sampler_interval", "1m") // do not ping + .put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect + .put("client.transport.ignore_cluster_name", true) // connect to any cluster + // custom settings may override defaults + .put(settings) + .build(); + logger.info("creating transport client on {} with custom settings {} and effective settings {}", + systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap()); + + // we need to disable dead lock check because we may have mixed node/transport clients + DefaultChannelFuture.setUseDeadLockChecker(false); + return TransportClient.builder().settings(effectiveSettings).build(); } return null; } @@ -64,33 +62,17 @@ public class TransportClientHelper { } } - @Override - public ExtendedTransportClient init(Settings settings) throws IOException { - super.init(settings); - // additional auto-connect - try { - Collection addrs = findAddresses(settings); - if (!connect(addrs, settings.getAsBoolean("autodiscover", false))) { - throw new NoNodeAvailableException("no cluster nodes available, check settings " - + settings.toString()); - } - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - return this; - } - private Collection findAddresses(Settings settings) throws IOException { final int defaultPort = settings.getAsInt("port", 9300); Collection addresses = new ArrayList<>(); - for (String hostname : settings.getAsList("host")) { + for (String hostname : settings.getAsArray("host")) { String[] splitHost = hostname.split(":", 2); if (splitHost.length == 2) { try { String host = splitHost[0]; InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); int port = Integer.parseInt(splitHost[1]); - TransportAddress address = new TransportAddress(inetAddress, port); + TransportAddress address = new InetSocketTransportAddress(inetAddress, port); addresses.add(address); } catch (NumberFormatException e) { logger.warn(e.getMessage(), e); @@ -99,18 +81,14 @@ public class TransportClientHelper { if (splitHost.length == 1) { String host = splitHost[0]; InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null); - TransportAddress address = new TransportAddress(inetAddress, defaultPort); + TransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort); addresses.add(address); } } return addresses; } - private boolean connect(Collection addresses, boolean autodiscover) { - if (getClient() == null) { - throw new IllegalStateException("no client present"); - } - TransportClient transportClient = (TransportClient) getClient(); + private boolean connect(TransportClient transportClient, Collection addresses, boolean autodiscover) { for (TransportAddress address : addresses) { transportClient.addTransportAddresses(address); } @@ -131,31 +109,9 @@ public class TransportClientHelper { return false; } - private Settings getTransportClientSettings(Settings settings) { - return Settings.builder() - // "cluster.name" - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), - settings.get(ClusterName.CLUSTER_NAME_SETTING.getKey())) - // "processors" - .put(EsExecutors.PROCESSORS_SETTING.getKey(), - settings.get(EsExecutors.PROCESSORS_SETTING.getKey(), - String.valueOf(Runtime.getRuntime().availableProcessors()))) - // "transport.type" - .put(NetworkModule.TRANSPORT_TYPE_KEY, - Netty4Plugin.NETTY_TRANSPORT_NAME) - .build(); - } - private void addDiscoveryNodes(TransportClient transportClient, DiscoveryNodes discoveryNodes) { for (DiscoveryNode discoveryNode : discoveryNodes) { transportClient.addTransportAddress(discoveryNode.getAddress()); } } - - static class MyTransportClient extends TransportClient { - - MyTransportClient(Settings settings, Collection> plugins) { - super(settings, plugins); - } - } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 388f6f3..42cd669 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -11,6 +11,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; +import org.xbib.elx.transport.TransportAdminClient; +import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; @@ -66,7 +68,6 @@ class BulkClientTest { final TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) .build(); bulkClient.newIndex("test"); bulkClient.close(); @@ -74,23 +75,27 @@ class BulkClientTest { @Test void testMapping() throws Exception { - final TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) + try (TransportAdminClient adminClient = ClientBuilder.builder() + .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("doc") - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject() - .endObject(); - bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(bulkClient.getMapping("test", "doc").containsKey("properties")); - bulkClient.close(); + TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("doc") + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject() + .endObject(); + bulkClient.newIndex("test", Settings.EMPTY, builder); + assertTrue(adminClient.getMapping("test", "doc").containsKey("properties")); + } } @Test diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index c023ef5..10b90fa 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -2,21 +2,13 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; - import java.util.concurrent.TimeUnit; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -26,9 +18,9 @@ class DuplicateIDTest { private static final Logger logger = LogManager.getLogger(DuplicateIDTest.class.getName()); - private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + private static final Long ACTIONS = 100L; - private static final Long ACTIONS = 5L; + private static final Long MAX_ACTIONS_PER_REQUEST = 10L; private final TestExtension.Helper helper; @@ -39,39 +31,23 @@ class DuplicateIDTest { @Test void testDuplicateDocIDs() throws Exception { long numactions = ACTIONS; - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) .put(helper.getTransportSettings()) - .build(); - try { - bulkClient.newIndex("test_dup"); + .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .build()) { + bulkClient.newIndex("test"); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test_dup", helper.randomString(1), false, + bulkClient.index("test", helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - client.flush(); - client.waitForResponses(30L, TimeUnit.SECONDS); - client.refreshIndex("test_dup"); - SearchSourceBuilder builder = new SearchSourceBuilder(); - builder.query(QueryBuilders.matchAllQuery()); - builder.size(0); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices("test_dup"); - searchRequest.types("test_dup"); - searchRequest.source(builder); - SearchResponse searchResponse = - helper.client("1").execute(SearchAction.INSTANCE, searchRequest).actionGet(); - long hits = searchResponse.getHits().getTotalHits(); - logger.info("hits = {}", hits); - assertTrue(hits < ACTIONS); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } finally { - client.close(); - assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount()); - if (client.getBulkController().getLastBulkError() != null) { - logger.error("error", client.getBulkController().getLastBulkError()); + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); + assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index 3304e4e..f451572 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -35,15 +35,14 @@ class IndexPruneTest { @Test void testPrune() throws IOException { - final TransportAdminClient adminClient = ClientBuilder.builder() + try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - final TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) - .build(); - try { + TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -64,20 +63,13 @@ class IndexPruneTest { assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); - indicesExistsRequest.indices(index); - IndicesExistsResponse indicesExistsResponse = - client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); - list.add(indicesExistsResponse.isExists()); + list.add(adminClient.isIndexExists(index)); } logger.info(list); assertFalse(list.get(0)); assertFalse(list.get(1)); assertTrue(list.get(2)); assertTrue(list.get(3)); - } finally { - adminClient.close(); - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 0b7e126..cfd10af 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -3,6 +3,7 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; @@ -32,55 +33,54 @@ class IndexShiftTest { this.helper = helper; } - @Test + @Test void testIndexShift() throws Exception { - final TransportAdminClient adminClient = ClientBuilder.builder() + try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - final TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) - .build(); - try { + TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) .build(); - client.newIndex("test_shift1234", settings); + bulkClient.newIndex("test_shift", settings); for (int i = 0; i < 1; i++) { - client.index("test_shift1234", helper.randomString(1), false, + bulkClient.index("test_shift", helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); IndexShiftResult indexShiftResult = - client.shiftIndex("test_shift", "test_shift1234", Arrays.asList("a", "b", "c")); + adminClient.shiftIndex("test", "test_shift", Arrays.asList("a", "b", "c")); assertTrue(indexShiftResult.getNewAliases().contains("a")); assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().isEmpty()); - Map aliases = client.getAliases("test_shift1234"); + Map aliases = adminClient.getAliases("test_shift"); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test_shift")); - String resolved = adminClient.resolveAlias("test_shift"); + assertTrue(aliases.containsKey("test")); + String resolved = adminClient.resolveAlias("test"); aliases = adminClient.getAliases(resolved); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test_shift")); - client.newIndex("test_shift5678", settings); + assertTrue(aliases.containsKey("test")); + bulkClient.newIndex("test_shift2", settings); for (int i = 0; i < 1; i++) { - client.index("test_shift5678", helper.randomString(1), false, + bulkClient.index("test_shift2", helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - client.flush(); - client.waitForResponses(30L, TimeUnit.SECONDS); - indexShiftResult = client.shiftIndex("test_shift", "test_shift5678", Arrays.asList("d", "e", "f"), - (request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add() - .index(index).alias(alias).filter(QueryBuilders.termQuery("my_key", alias))) + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + indexShiftResult = adminClient.shiftIndex("test", "test_shift2", Arrays.asList("d", "e", "f"), + (request, index, alias) -> request.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, + index, alias).filter(QueryBuilders.termQuery("my_key", alias))) ); assertTrue(indexShiftResult.getNewAliases().contains("d")); assertTrue(indexShiftResult.getNewAliases().contains("e")); @@ -88,14 +88,14 @@ class IndexShiftTest { assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("c")); - aliases = client.getAliases("test_shift5678"); + aliases = adminClient.getAliases("test_shift2"); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test_shift"); + resolved = adminClient.resolveAlias("test"); aliases = adminClient.getAliases(resolved); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); @@ -103,9 +103,6 @@ class IndexShiftTest { assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - } finally { - adminClient.close(); - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java new file mode 100644 index 0000000..e42c9b1 --- /dev/null +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -0,0 +1,84 @@ +package org.xbib.elx.transport.test; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.Parameters; +import org.xbib.elx.transport.TransportBulkClient; +import org.xbib.elx.transport.TransportBulkClientProvider; +import org.xbib.elx.transport.TransportSearchClient; +import org.xbib.elx.transport.TransportSearchClientProvider; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +@ExtendWith(TestExtension.class) +class SearchTest { + + private static final Logger logger = LogManager.getLogger(SearchTest.class.getName()); + + private static final Long ACTIONS = 100L; + + private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + + private final TestExtension.Helper helper; + + SearchTest(TestExtension.Helper helper) { + this.helper = helper; + } + + @Test + void testDocStream() throws Exception { + long numactions = ACTIONS; + final TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .build(); + try (bulkClient) { + bulkClient.newIndex("test"); + for (int i = 0; i < ACTIONS; i++) { + bulkClient.index("test", null, false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + } + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex("test"); + assertEquals(numactions, bulkClient.getSearchableDocs("test")); + } + assertEquals(numactions, bulkClient.getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); + try (TransportSearchClient searchClient = ClientBuilder.builder() + .setSearchClientProvider(TransportSearchClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { + Stream stream = searchClient.search(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery()), + TimeValue.timeValueMinutes(1), 10); + long count = stream.count(); + assertEquals(numactions, count); + Stream ids = searchClient.getIds(qb -> qb + .setIndices("test") + .setQuery(QueryBuilders.matchAllQuery())); + final AtomicInteger idcount = new AtomicInteger(); + ids.forEach(id -> { + logger.info(id); + idcount.incrementAndGet(); + }); + assertEquals(numactions, idcount.get()); + } + } +} diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index b0fb265..f55d3be 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -28,57 +28,49 @@ class SmokeTest { this.helper = helper; } + @Test void smokeTest() throws Exception { - final TransportAdminClient adminClient = ClientBuilder.builder() + try (TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); - final TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) - .build(); - IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); - try { - assertEquals(helper.getClusterName(), client.getClusterName()); - client.newIndex("test_smoke"); - client.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - client.flush(); - client.waitForResponses(30, TimeUnit.SECONDS); - client.checkMapping("test_smoke"); - client.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); - client.delete("test_smoke", "1"); - client.flush(); - client.waitForResponses(30, TimeUnit.SECONDS); - client.checkMapping("test_smoke"); - client.deleteIndex("test_smoke"); - IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test_smoke", Settings.builder() - .build()); + TransportBulkClient bulkClient = ClientBuilder.builder() + .setBulkClientProvider(TransportBulkClientProvider.class) + .put(helper.getTransportSettings()) + .build()) { + IndexDefinition indexDefinition = + adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); assertEquals(0, indexDefinition.getReplicaLevel()); - client.newIndex(indexDefinition); - client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); - client.flush(); - client.waitForResponses(30, TimeUnit.SECONDS); - client.updateReplicaLevel(indexDefinition, 2); - int replica = client.getReplicaLevel(indexDefinition); + assertEquals(helper.getClusterName(), adminClient.getClusterName()); + bulkClient.newIndex("test_smoke"); + bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.checkMapping("test_smoke"); + bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); + bulkClient.delete("test_smoke", "1"); + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete("test_smoke", "1"); + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.deleteIndex("test_smoke"); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.flush(); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.updateReplicaLevel(indexDefinition, 2); + int replica = adminClient.getReplicaLevel(indexDefinition); assertEquals(2, replica); - client.deleteIndex(indexDefinition); - assertEquals(0, client.getBulkMetric().getFailed().getCount()); - assertEquals(4, client.getBulkMetric().getSucceeded().getCount()); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } finally { - bulkClient.close(); assertEquals(0, bulkClient.getBulkMetric().getFailed().getCount()); - assertEquals(4, bulkClient.getBulkMetric().getSucceeded().getCount()); + assertEquals(6, bulkClient.getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - // close admin after bulk adminClient.deleteIndex(indexDefinition); - adminClient.close(); } } } diff --git a/elx-transport/src/test/resources/log4j2.xml b/elx-transport/src/test/resources/log4j2-test.xml similarity index 100% rename from elx-transport/src/test/resources/log4j2.xml rename to elx-transport/src/test/resources/log4j2-test.xml