From 46487a691dfb92c0504b9ea3f01b5576fd69a436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Thu, 15 Apr 2021 17:11:00 +0200 Subject: [PATCH] tests working under ES 7.10.2 --- .../java/org/xbib/elx/api/AdminClient.java | 2 +- .../java/org/xbib/elx/api/BulkClient.java | 2 +- .../java/org/xbib/elx/api/BulkController.java | 2 + .../java/org/xbib/elx/api/BulkProcessor.java | 10 +- .../org/xbib/elx/api/IndexDefinition.java | 4 - .../xbib/elx/common/AbstractAdminClient.java | 28 +-- .../xbib/elx/common/AbstractBulkClient.java | 26 ++- .../org/xbib/elx/common/ClientBuilder.java | 5 + .../elx/common/DefaultBulkController.java | 54 +++-- .../xbib/elx/common/DefaultBulkListener.java | 50 ++++- .../xbib/elx/common/DefaultBulkProcessor.java | 48 +++-- .../elx/common/DefaultIndexDefinition.java | 199 +++++++++++++++--- .../org/xbib/elx/common/MockAdminClient.java | 15 -- .../org/xbib/elx/common/MockBulkClient.java | 32 --- .../java/org/xbib/elx/common/Parameters.java | 6 + .../java/org/xbib/elx/http/HttpAction.java | 11 +- .../org/xbib/elx/http/HttpClientHelper.java | 41 +++- .../java/org/xbib/elx/http/package-info.java | 4 - .../xbib/elx/http/test/BulkClientTest.java | 158 ++++++-------- .../xbib/elx/http/test/DuplicateIDTest.java | 29 ++- .../xbib/elx/http/test/IndexPruneTest.java | 29 ++- .../xbib/elx/http/test/IndexShiftTest.java | 54 ++--- .../org/xbib/elx/http/test/SearchTest.java | 33 +-- .../org/xbib/elx/http/test/SmokeTest.java | 46 ++-- .../org/xbib/elx/http/test/TestExtension.java | 21 +- elx-http/src/test/resources/log4j2-test.xml | 2 +- .../org/xbib/elx/node/NodeClientHelper.java | 10 +- .../java/org/xbib/elx/node/package-info.java | 4 - .../xbib/elx/node/test/BulkClientTest.java | 125 +++++------ .../xbib/elx/node/test/DuplicateIDTest.java | 14 +- .../xbib/elx/node/test/IndexPruneTest.java | 31 ++- .../xbib/elx/node/test/IndexShiftTest.java | 44 ++-- .../org/xbib/elx/node/test/SearchTest.java | 48 +++-- .../org/xbib/elx/node/test/SmokeTest.java | 49 +++-- .../org/xbib/elx/node/test/TestExtension.java | 10 +- elx-node/src/test/resources/log4j2-test.xml | 2 +- .../elx/transport/TransportClientHelper.java | 2 +- .../elx/transport/test/BulkClientTest.java | 127 ++++------- .../elx/transport/test/DuplicateIDTest.java | 21 +- .../elx/transport/test/IndexPruneTest.java | 27 ++- .../elx/transport/test/IndexShiftTest.java | 53 ++--- .../xbib/elx/transport/test/SearchTest.java | 64 +++--- .../xbib/elx/transport/test/SmokeTest.java | 49 +++-- .../elx/transport/test/TestExtension.java | 17 +- .../src/test/resources/log4j2-test.xml | 4 +- gradle.properties | 6 +- gradle/test/junit5.gradle | 24 ++- 47 files changed, 888 insertions(+), 754 deletions(-) delete mode 100644 elx-http/src/main/java/org/xbib/elx/http/package-info.java delete mode 100644 elx-node/src/main/java/org/xbib/elx/node/package-info.java diff --git a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java index af95ea6..b58bfb8 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/AdminClient.java @@ -3,7 +3,6 @@ package org.xbib.elx.api; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; /** * Interface for extended managing and indexing methods of an Elasticsearch client. @@ -81,6 +80,7 @@ public interface AdminClient extends BasicClient { /** * Prune index. + * * @param indexDefinition the index definition * @return the index prune result */ diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 5f2a18a..8072981 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -55,7 +55,7 @@ public interface BulkClient extends BasicClient, Flushable { * Submitting request will be done when limits are exceeded. * * @param indexDefinition the index definition - * @param id the id + * @param id the id * @param create true if document is to be created, false otherwise * @param source the source * @return this client methods diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java index fe2bcce..daaf68f 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java @@ -16,6 +16,8 @@ public interface BulkController extends Closeable, Flushable { void inactivate(); + BulkProcessor getBulkProcessor(); + BulkMetric getBulkMetric(); Throwable getLastBulkError(); diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index f6112bc..fab2a18 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -8,11 +8,17 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { + void setBulkActions(int bulkActions); + + int getBulkActions(); + + void setBulkSize(long bulkSize); + + long getBulkSize(); + BulkProcessor add(DocWriteRequest request); boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; - - BulkListener getBulkListener(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java index 396ddf4..6c738e4 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java +++ b/elx-api/src/main/java/org/xbib/elx/api/IndexDefinition.java @@ -52,10 +52,6 @@ public interface IndexDefinition { boolean isEnabled(); - IndexDefinition setIgnoreErrors(boolean ignoreErrors); - - boolean ignoreErrors(); - IndexDefinition setShift(boolean shift); boolean isShiftEnabled(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java index 1fc3fa4..158cec7 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractAdminClient.java @@ -96,12 +96,15 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements @Override public AdminClient deleteIndex(IndexDefinition indexDefinition) { - ensureClientIsPresent(); + if (!ensureIndexDefinition(indexDefinition)) { + return this; + } String index = indexDefinition.getFullIndexName(); if (index == null) { logger.warn("no index name given to delete index"); return this; } + ensureClientIsPresent(); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest().indices(index); client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest).actionGet(); waitForCluster("YELLOW", 30L, TimeUnit.SECONDS); @@ -187,11 +190,11 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements IndexAbstraction indexAbstraction = clusterStateResponse.getState().getMetadata() .getIndicesLookup().get(alias); if (indexAbstraction == null) { - return List.of(); + return Collections.emptyList(); } List indexMetadata = indexAbstraction.getIndices(); if (indexMetadata == null) { - return List.of(); + return Collections.emptyList(); } return indexMetadata.stream().map(im -> im.getIndex().getName()) .sorted().collect(Collectors.toList()); @@ -204,9 +207,13 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements if (additionalAliases == null) { return new EmptyIndexShiftResult(); } + if (!ensureIndexDefinition(indexDefinition)) { + return new EmptyIndexShiftResult(); + } if (indexDefinition.isShiftEnabled()) { return shiftIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), - additionalAliases.stream().filter(a -> a != null && !a.isEmpty()) + additionalAliases.stream() + .filter(a -> a != null && !a.isEmpty()) .collect(Collectors.toList()), indexAliasAdder); } return new EmptyIndexShiftResult(); @@ -281,18 +288,16 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } if (!indicesAliasesRequest.getAliasActions().isEmpty()) { - logger.debug("indices alias request = {}", indicesAliasesRequest.getAliasActions().toString()); - AcknowledgedResponse indicesAliasesResponse = - client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet(); - logger.debug("response isAcknowledged = {}", - indicesAliasesResponse.isAcknowledged()); + client.execute(IndicesAliasesAction.INSTANCE, indicesAliasesRequest).actionGet(); } return new SuccessIndexShiftResult(moveAliases, newAliases); } @Override public IndexPruneResult pruneIndex(IndexDefinition indexDefinition) { - return indexDefinition != null && indexDefinition.isPruneEnabled() && indexDefinition.getRetention() != null && indexDefinition.getDateTimePattern() != null ? + return indexDefinition != null && indexDefinition.isPruneEnabled() && + indexDefinition.getRetention() != null && + indexDefinition.getDateTimePattern() != null ? pruneIndex(indexDefinition.getIndex(), indexDefinition.getFullIndexName(), indexDefinition.getDateTimePattern(), @@ -322,8 +327,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements Matcher m = pattern.matcher(s); if (m.matches() && m.group(1).equals(index) && !s.equals(protectedIndexName)) { candidateIndices.add(s); - } else { - logger.info("not a candidate: " + s); } } if (candidateIndices.isEmpty()) { @@ -570,7 +573,6 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements } } - private static class EmptyIndexShiftResult implements IndexShiftResult { @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java index 62893c8..2c590a0 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractBulkClient.java @@ -48,8 +48,6 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements logger.log(Level.INFO, "initializing bulk controller with settings = " + settings.toDelimitedString(',')); bulkController = new DefaultBulkController(this); bulkController.init(settings); - } else { - logger.log(Level.WARN, "not initializing"); } } @@ -101,14 +99,13 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements } Settings settings = Settings.builder().loadFromSource(indexDefinition.getSettings(), XContentType.JSON).build(); createIndexRequestBuilder.setSettings(settings); - Map mappings = indexDefinition.getMappings() == null ? null : - JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered(); - if (mappings != null) { + if (indexDefinition.getMappings() != null) { + Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexDefinition.getMappings()).mapOrdered(); createIndexRequestBuilder.addMapping(TYPE_NAME, mappings); } else { - createIndexRequestBuilder.addMapping(TYPE_NAME, - JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject()); + XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject(TYPE_NAME).endObject().endObject(); + createIndexRequestBuilder.addMapping(TYPE_NAME, builder); } CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet(); if (createIndexResponse.isAcknowledged()) { @@ -146,8 +143,7 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements @Override public BulkClient index(IndexDefinition indexDefinition, String id, boolean create, String source) { - return index(indexDefinition, id, create, - new BytesArray(source.getBytes(StandardCharsets.UTF_8))); + return index(indexDefinition, id, create, new BytesArray(source.getBytes(StandardCharsets.UTF_8))); } @Override @@ -155,8 +151,9 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (!ensureIndexDefinition(indexDefinition)) { return this; } - return index(new IndexRequest().index(indexDefinition.getFullIndexName()).id(id).create(create) - .source(source, XContentType.JSON)); + return index(new IndexRequest() + .index(indexDefinition.getFullIndexName()) + .id(id).create(create).source(source, XContentType.JSON)); } @Override @@ -195,8 +192,9 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements if (!ensureIndexDefinition(indexDefinition)) { return this; } - return update(new UpdateRequest().index(indexDefinition.getFullIndexName()).id(id) - .doc(source, XContentType.JSON)); + return update(new UpdateRequest() + .index(indexDefinition.getFullIndexName()) + .id(id).doc(source, XContentType.JSON)); } @Override diff --git a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java index 49133a3..581f04f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java +++ b/elx-common/src/main/java/org/xbib/elx/common/ClientBuilder.java @@ -93,6 +93,11 @@ public class ClientBuilder { return this; } + public ClientBuilder put(String key, Boolean value) { + settingsBuilder.put(key, value); + return this; + } + public ClientBuilder put(String key, Long value) { settingsBuilder.put(key, value); return this; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java index 58d3c69..7dd072d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkController.java @@ -33,9 +33,9 @@ public class DefaultBulkController implements BulkController { private BulkListener bulkListener; - private final long maxWaitTime; + private long maxWaitTime; - private final TimeUnit maxWaitTimeUnit; + private TimeUnit maxWaitTimeUnit; private final AtomicBoolean active; @@ -43,23 +43,17 @@ public class DefaultBulkController implements BulkController { this.bulkClient = bulkClient; this.bulkMetric = new DefaultBulkMetric(); this.active = new AtomicBoolean(false); - this.maxWaitTime = 30L; - this.maxWaitTimeUnit = TimeUnit.SECONDS; - } - - @Override - public BulkMetric getBulkMetric() { - return bulkMetric; - } - - @Override - public Throwable getLastBulkError() { - return bulkProcessor.getBulkListener().getLastBulkError(); } @Override public void init(Settings settings) { bulkMetric.init(settings); + String maxWaitTimeStr = settings.get(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), + Parameters.MAX_WAIT_BULK_RESPONSE.getString()); + TimeValue maxWaitTimeValue = TimeValue.parseTimeValue(maxWaitTimeStr, + TimeValue.timeValueSeconds(30), ""); + this.maxWaitTime = maxWaitTimeValue.seconds(); + this.maxWaitTimeUnit = TimeUnit.SECONDS; int maxActionsPerRequest = settings.getAsInt(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), Parameters.MAX_ACTIONS_PER_REQUEST.getInteger()); int maxConcurrentRequests = settings.getAsInt(Parameters.MAX_CONCURRENT_REQUESTS.getName(), @@ -74,7 +68,10 @@ public class DefaultBulkController implements BulkController { Parameters.ENABLE_BULK_LOGGING.getBoolean()); boolean failOnBulkError = settings.getAsBoolean(Parameters.FAIL_ON_BULK_ERROR.getName(), Parameters.FAIL_ON_BULK_ERROR.getBoolean()); - this.bulkListener = new DefaultBulkListener(this, bulkMetric, enableBulkLogging, failOnBulkError); + int responseTimeCount = settings.getAsInt(Parameters.RESPONSE_TIME_COUNT.getName(), + Parameters.RESPONSE_TIME_COUNT.getInteger()); + this.bulkListener = new DefaultBulkListener(this, bulkMetric, + enableBulkLogging, failOnBulkError, responseTimeCount); this.bulkProcessor = DefaultBulkProcessor.builder(bulkClient.getClient(), bulkListener) .setBulkActions(maxActionsPerRequest) .setConcurrentRequests(maxConcurrentRequests) @@ -83,17 +80,32 @@ public class DefaultBulkController implements BulkController { .build(); this.active.set(true); if (logger.isInfoEnabled()) { - logger.info("bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " + + logger.info("bulk processor now active with maxWaitTime = {} maxActionsPerRequest = {} maxConcurrentRequests = {} " + "flushIngestInterval = {} maxVolumePerRequest = {} " + "bulk logging = {} fail on bulk error = {} " + "logger debug = {} from settings = {}", - maxActionsPerRequest, maxConcurrentRequests, + maxWaitTimeStr, maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest, enableBulkLogging, failOnBulkError, logger.isDebugEnabled(), settings.toDelimitedString(',')); } } + @Override + public BulkProcessor getBulkProcessor() { + return bulkProcessor; + } + + @Override + public BulkMetric getBulkMetric() { + return bulkMetric; + } + + @Override + public Throwable getLastBulkError() { + return bulkListener != null ? bulkListener.getLastBulkError() : null; + } + @Override public void inactivate() { this.active.set(false); @@ -113,8 +125,8 @@ public class DefaultBulkController implements BulkController { public void bulkIndex(IndexRequest indexRequest) { ensureActiveAndBulk(); try { - bulkProcessor.add(indexRequest); bulkMetric.getCurrentIngest().inc(indexRequest.index(), TYPE_NAME, indexRequest.id()); + bulkProcessor.add(indexRequest); } catch (Exception e) { if (logger.isErrorEnabled()) { logger.error("bulk add of index failed: " + e.getMessage(), e); @@ -127,8 +139,8 @@ public class DefaultBulkController implements BulkController { public void bulkDelete(DeleteRequest deleteRequest) { ensureActiveAndBulk(); try { - bulkProcessor.add(deleteRequest); bulkMetric.getCurrentIngest().inc(deleteRequest.index(), TYPE_NAME, deleteRequest.id()); + bulkProcessor.add(deleteRequest); } catch (Exception e) { if (logger.isErrorEnabled()) { logger.error("bulk add of delete failed: " + e.getMessage(), e); @@ -141,8 +153,8 @@ public class DefaultBulkController implements BulkController { public void bulkUpdate(UpdateRequest updateRequest) { ensureActiveAndBulk(); try { - bulkProcessor.add(updateRequest); bulkMetric.getCurrentIngest().inc(updateRequest.index(), TYPE_NAME, updateRequest.id()); + bulkProcessor.add(updateRequest); } catch (Exception e) { if (logger.isErrorEnabled()) { logger.error("bulk add of update failed: " + e.getMessage(), e); @@ -191,8 +203,8 @@ public class DefaultBulkController implements BulkController { @Override public void close() throws IOException { - flush(); bulkMetric.close(); + flush(); bulkClient.waitForResponses(maxWaitTime, maxWaitTimeUnit); if (bulkProcessor != null) { bulkProcessor.close(); diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java index 2c191c8..a504ae0 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkListener.java @@ -8,6 +8,9 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.xbib.elx.api.BulkController; import org.xbib.elx.api.BulkListener; import org.xbib.elx.api.BulkMetric; +import java.util.Arrays; +import java.util.LongSummaryStatistics; +import java.util.stream.LongStream; public class DefaultBulkListener implements BulkListener { @@ -23,14 +26,21 @@ public class DefaultBulkListener implements BulkListener { private Throwable lastBulkError; + private final int responseTimeCount; + + private final LastResponseTimes responseTimes; + public DefaultBulkListener(BulkController bulkController, BulkMetric bulkMetric, boolean isBulkLoggingEnabled, - boolean failOnError) { + boolean failOnError, + int responseTimeCount) { this.bulkController = bulkController; this.bulkMetric = bulkMetric; this.isBulkLoggingEnabled = isBulkLoggingEnabled; this.failOnError = failOnError; + this.responseTimeCount = responseTimeCount; + this.responseTimes = new LastResponseTimes(responseTimeCount); } @Override @@ -56,6 +66,16 @@ public class DefaultBulkListener implements BulkListener { bulkMetric.getCurrentIngest().dec(); bulkMetric.getSucceeded().inc(response.getItems().length); bulkMetric.markTotalIngest(response.getItems().length); + if (responseTimeCount > 0 && responseTimes.add(response.getTook().millis()) == 0) { + LongSummaryStatistics stat = responseTimes.longStream().summaryStatistics(); + if (isBulkLoggingEnabled && logger.isDebugEnabled()) { + logger.debug("bulk response millis: avg = " + stat.getAverage() + + " min =" + stat.getMin() + + " max = " + stat.getMax() + + " actions = " + bulkController.getBulkProcessor().getBulkActions() + + " size = " + bulkController.getBulkProcessor().getBulkSize()); + } + } int n = 0; for (BulkItemResponse itemResponse : response.getItems()) { bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId()); @@ -66,7 +86,7 @@ public class DefaultBulkListener implements BulkListener { } } if (isBulkLoggingEnabled && logger.isDebugEnabled()) { - logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests", + logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] [concurrent requests={}]", executionId, bulkMetric.getSucceeded().getCount(), bulkMetric.getFailed().getCount(), @@ -101,4 +121,30 @@ public class DefaultBulkListener implements BulkListener { public Throwable getLastBulkError() { return lastBulkError; } + + private static class LastResponseTimes { + + private final Long[] values; + + private final int limit; + + private int index; + + public LastResponseTimes(int limit) { + this.values = new Long[limit]; + Arrays.fill(values, -1L); + this.limit = limit; + this.index = 0; + } + + public int add(Long value) { + int i = index++ % limit; + values[i] = value; + return i; + } + + public LongStream longStream() { + return Arrays.stream(values).filter(v -> v != -1L).mapToLong(Long::longValue); + } + } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index e7468cd..547fd6d 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -25,19 +25,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** - * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request - * (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk + * A bulk processor is a thread safe bulk processing class, allowing to easily + * set when to "flush" a new bulk request + * (either based on number of actions, based on the size, or time), and + * to easily control the number of concurrent bulk * requests allowed to be executed in parallel. * In order to create a new bulk processor, use the {@link Builder}. */ public class DefaultBulkProcessor implements BulkProcessor { - private final BulkListener bulkListener; - - private final int bulkActions; - - private final long bulkSize; - private final ScheduledThreadPoolExecutor scheduler; private final ScheduledFuture scheduledFuture; @@ -48,6 +44,10 @@ public class DefaultBulkProcessor implements BulkProcessor { private BulkRequest bulkRequest; + private int bulkActions; + + private long bulkSize; + private volatile boolean closed; private DefaultBulkProcessor(ElasticsearchClient client, @@ -57,7 +57,6 @@ public class DefaultBulkProcessor implements BulkProcessor { int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { - this.bulkListener = bulkListener; this.executionIdGen = new AtomicLong(); this.closed = false; this.bulkActions = bulkActions; @@ -80,17 +79,31 @@ public class DefaultBulkProcessor implements BulkProcessor { } } - @Override - public BulkListener getBulkListener() { - return bulkListener; - } - public static Builder builder(ElasticsearchClient client, BulkListener bulkListener) { - Objects.requireNonNull(client, "The client you specified while building a BulkProcessor is null"); Objects.requireNonNull(bulkListener, "A listener for the BulkProcessor is required but null"); return new Builder(client, bulkListener); } + @Override + public void setBulkActions(int bulkActions) { + this.bulkActions = bulkActions; + } + + @Override + public int getBulkActions() { + return bulkActions; + } + + @Override + public void setBulkSize(long bulkSize) { + this.bulkSize = bulkSize; + } + + @Override + public long getBulkSize() { + return bulkSize; + } + /** * Wait for bulk request handler with flush. * @param timeout the timeout value @@ -154,9 +167,8 @@ public class DefaultBulkProcessor implements BulkProcessor { public synchronized DefaultBulkProcessor add(DocWriteRequest request) { ensureOpen(); bulkRequest.add(request); - if (bulkActions != -1 && - bulkRequest.numberOfActions() >= bulkActions || - bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) { + if ((bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) || + (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize)) { execute(); } return this; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java index ace4d6a..3d2134f 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultIndexDefinition.java @@ -1,10 +1,28 @@ package org.xbib.elx.common; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.yaml.YamlXContent; +import org.xbib.elx.api.AdminClient; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.api.IndexRetention; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.MalformedInputException; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -12,6 +30,8 @@ public class DefaultIndexDefinition implements IndexDefinition { private String index; + private String type; + private String fullIndexName; private DateTimeFormatter formatter; @@ -24,8 +44,6 @@ public class DefaultIndexDefinition implements IndexDefinition { private boolean enabled; - private boolean ignoreErrors; - private boolean shift; private boolean prune; @@ -40,13 +58,64 @@ public class DefaultIndexDefinition implements IndexDefinition { private TimeUnit maxWaitTimeUnit; - private long startRefreshInterval; + private int startRefreshInterval; - private long stopRefreshInterval; + private int stopRefreshInterval; - public DefaultIndexDefinition() { - this.pattern = Pattern.compile("^(.*?)(\\d+)$"); - this.formatter = DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault()); + public DefaultIndexDefinition(String index, String type) { + setIndex(index); + setType(type); + setDateTimeFormatter(DateTimeFormatter.ofPattern("yyyyMMdd", Locale.getDefault())); + setDateTimePattern(Pattern.compile("^(.*?)(\\d+)$")); + setFullIndexName(index + getDateTimeFormatter().format(LocalDateTime.now())); + setMaxWaitTime(Parameters.MAX_WAIT_BULK_RESPONSE_SECONDS.getInteger(), TimeUnit.SECONDS); + setShift(false); + setPrune(false); + setEnabled(true); + } + + public DefaultIndexDefinition(AdminClient adminClient, String index, String type, Settings settings) + throws IOException { + TimeValue timeValue = settings.getAsTime(Parameters.MAX_WAIT_BULK_RESPONSE.getName(), TimeValue.timeValueSeconds(30)); + setMaxWaitTime(timeValue.seconds(), TimeUnit.SECONDS); + String indexName = settings.get("name", index); + String indexType = settings.get("type", type); + boolean enabled = settings.getAsBoolean("enabled", true); + setIndex(indexName); + setType(indexType); + setEnabled(enabled); + String fullIndexName = adminClient.resolveAlias(indexName).stream().findFirst().orElse(indexName); + setFullIndexName(fullIndexName); + if (settings.get("settings") != null && settings.get("mapping") != null) { + setSettings(findSettingsFrom(settings.get("settings"))); + setMappings(findMappingsFrom(settings.get("mapping"))); + setStartBulkRefreshSeconds(settings.getAsInt(Parameters.START_BULK_REFRESH_SECONDS.getName(), -1)); + setStopBulkRefreshSeconds(settings.getAsInt(Parameters.STOP_BULK_REFRESH_SECONDS.getName(), -1)); + setReplicaLevel(settings.getAsInt("replica", 0)); + boolean shift = settings.getAsBoolean("shift", false); + setShift(shift); + if (shift) { + String dateTimeFormat = settings.get(Parameters.DATE_TIME_FORMAT.getName(), + Parameters.DATE_TIME_FORMAT.getString()); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.getDefault()) + .withZone(ZoneId.systemDefault()); + setDateTimeFormatter(dateTimeFormatter); + String dateTimePatternStr = settings.get("dateTimePattern", "^(.*?)(\\\\d+)$"); + Pattern dateTimePattern = Pattern.compile(dateTimePatternStr); + setDateTimePattern(dateTimePattern); + String fullName = indexName + dateTimeFormatter.format(LocalDateTime.now()); + fullIndexName = adminClient.resolveAlias(fullName).stream().findFirst().orElse(fullName); + setFullIndexName(fullIndexName); + boolean prune = settings.getAsBoolean("prune", false); + setPrune(prune); + if (prune) { + IndexRetention indexRetention = new DefaultIndexRetention() + .setMinToKeep(settings.getAsInt("retention.mintokeep", 0)) + .setDelta(settings.getAsInt("retention.delta", 0)); + setRetention(indexRetention); + } + } + } } @Override @@ -60,6 +129,17 @@ public class DefaultIndexDefinition implements IndexDefinition { return index; } + @Override + public IndexDefinition setType(String type) { + this.type = type; + return this; + } + + @Override + public String getType() { + return type; + } + @Override public IndexDefinition setFullIndexName(String fullIndexName) { this.fullIndexName = fullIndexName; @@ -115,6 +195,24 @@ public class DefaultIndexDefinition implements IndexDefinition { return pattern; } + public IndexDefinition setStartBulkRefreshSeconds(int seconds) { + this.startRefreshInterval = seconds; + return this; + } + + public int getStartBulkRefreshSeconds() { + return startRefreshInterval; + } + + public IndexDefinition setStopBulkRefreshSeconds(int seconds) { + this.stopRefreshInterval = seconds; + return this; + } + + public int getStopBulkRefreshSeconds() { + return stopRefreshInterval; + } + @Override public IndexDefinition setEnabled(boolean enabled) { this.enabled = enabled; @@ -126,17 +224,6 @@ public class DefaultIndexDefinition implements IndexDefinition { return enabled; } - @Override - public IndexDefinition setIgnoreErrors(boolean ignoreErrors) { - this.ignoreErrors = ignoreErrors; - return this; - } - - @Override - public boolean ignoreErrors() { - return ignoreErrors; - } - @Override public IndexDefinition setShift(boolean shift) { this.shift = shift; @@ -209,26 +296,68 @@ public class DefaultIndexDefinition implements IndexDefinition { return maxWaitTimeUnit; } - @Override - public IndexDefinition setStartRefreshInterval(long seconds) { - this.startRefreshInterval = seconds; - return this; + private static String findSettingsFrom(String string) throws IOException { + if (string == null) { + return null; + } + try { + XContentBuilder builder = JsonXContent.contentBuilder(); + try (InputStream inputStream = findInputStream(string)) { + if (inputStream != null) { + Settings settings = Settings.builder().loadFromStream(string, inputStream, true).build(); + builder.startObject(); + settings.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + } + } + return Strings.toString(builder); + } catch (MalformedURLException e) { + return string; + } catch (IOException e) { + throw new IOException("unable to read JSON from " + string + ": " + e.getMessage(), e); + } } - @Override - public long getStartRefreshInterval() { - return startRefreshInterval; + private static String findMappingsFrom(String string) throws IOException { + if (string == null) { + return null; + } + try { + XContentBuilder builder = JsonXContent.contentBuilder(); + try (InputStream inputStream = findInputStream(string)) { + if (inputStream != null) { + if (string.endsWith(".json")) { + Map mappings = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered(); + builder.map(mappings); + } + if (string.endsWith(".yml") || string.endsWith(".yaml")) { + Map mappings = YamlXContent.yamlXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream).mapOrdered(); + builder.map(mappings); + } + } + } + return Strings.toString(builder); + } catch (MalformedInputException e) { + return string; + } catch (IOException e) { + throw new IOException("unable to read JSON from " + string + ": " + e.getMessage(), e); + } } - @Override - public IndexDefinition setStopRefreshInterval(long seconds) { - this.stopRefreshInterval = seconds; - return this; + private static InputStream findInputStream(String string) { + if (string == null) { + return null; + } + try { + URL url = ClassLoader.getSystemClassLoader().getResource(string); + if (url == null) { + url = new URL(string); + } + return url.openStream(); + } catch (IOException e) { + return null; + } } - - @Override - public long getStopRefreshInterval() { - return stopRefreshInterval; - } - } diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java index ebc1450..a03a5b6 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockAdminClient.java @@ -28,16 +28,6 @@ public class MockAdminClient extends AbstractAdminClient { protected void closeClient(Settings settings) { } - @Override - public MockAdminClient deleteIndex(String index) { - return this; - } - - @Override - public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) { - return true; - } - @Override public void waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) { } @@ -47,11 +37,6 @@ public class MockAdminClient extends AbstractAdminClient { } - @Override - public MockAdminClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) { - return this; - } - @Override public void close() { // nothing to do diff --git a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java index 5494d90..e0e861b 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/MockBulkClient.java @@ -39,21 +39,6 @@ public class MockBulkClient extends AbstractBulkClient { protected void closeClient(Settings settings) { } - @Override - public MockBulkClient index(String index, String id, boolean create, String source) { - return this; - } - - @Override - public MockBulkClient delete(String index, String id) { - return this; - } - - @Override - public MockBulkClient update(String index, String id, String source) { - return this; - } - @Override public MockBulkClient index(IndexRequest indexRequest) { return this; @@ -69,28 +54,11 @@ public class MockBulkClient extends AbstractBulkClient { return this; } - @Override - public void startBulk(String index, long startRefreshInterval, long stopRefreshIterval) { - } - - @Override - public void stopBulk(String index, long maxWaitTime, TimeUnit timeUnit) { - } - @Override public boolean waitForResponses(long maxWaitTime, TimeUnit timeUnit) { return true; } - @Override - public void refreshIndex(String index) { - } - - @Override - public void flushIndex(String index) { - } - - @Override public void flush() { // nothing to do diff --git a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java index e042904..0dabaf9 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/Parameters.java +++ b/elx-common/src/main/java/org/xbib/elx/common/Parameters.java @@ -2,6 +2,10 @@ package org.xbib.elx.common; public enum Parameters { + DATE_TIME_FORMAT("dateTimeFormat", String.class, "yyyyMMdd"), + + MAX_WAIT_BULK_RESPONSE("bulk.max_wait_response", String.class, "30s"), + MAX_WAIT_BULK_RESPONSE_SECONDS("bulk.max_wait_response_seconds", Integer.class, 30), START_BULK_REFRESH_SECONDS("bulk.start_refresh_seconds", Integer.class, 0), @@ -14,6 +18,8 @@ public enum Parameters { MAX_ACTIONS_PER_REQUEST("bulk.max_actions_per_request", Integer.class, 1000), + RESPONSE_TIME_COUNT("bulk.response_time_count", Integer.class, 64), + // 0 = 1 CPU, synchronous requests, > 0 = n + 1 CPUs, asynchronous requests MAX_CONCURRENT_REQUESTS("bulk.max_concurrent_requests", Integer.class, Runtime.getRuntime().availableProcessors() - 1), diff --git a/elx-http/src/main/java/org/xbib/elx/http/HttpAction.java b/elx-http/src/main/java/org/xbib/elx/http/HttpAction.java index 0cdf1e8..6dcb579 100644 --- a/elx-http/src/main/java/org/xbib/elx/http/HttpAction.java +++ b/elx-http/src/main/java/org/xbib/elx/http/HttpAction.java @@ -61,8 +61,8 @@ public abstract class HttpAction { - if (logger.isDebugEnabled()) { - logger.log(Level.DEBUG, "got response: " + fullHttpResponse.getStatus().getCode() + + if (logger.isTraceEnabled()) { + logger.log(Level.TRACE, "got response: " + fullHttpResponse.getStatus().getCode() + " headers = " + fullHttpResponse.getHeaders() + " content = " + fullHttpResponse.getBody().toString(StandardCharsets.UTF_8)); } @@ -78,8 +78,8 @@ public abstract class HttpAction(); + this.closed = new AtomicBoolean(); } @SuppressWarnings({"unchecked", "rawtypes"}) - public HttpClientHelper init(Settings settings) throws IOException { + public void init(Settings settings) { + HttpAddress httpAddress; if (settings.hasValue("url")) { this.url = settings.get("url"); - } else if (settings.hasValue("host")) { - this.url = URL.http() + httpAddress = HttpAddress.http1(this.url); + } else if (settings.hasValue("host") && settings.hasValue("post")) { + URL u = URL.http() .host(settings.get("host")).port(settings.getAsInt("port", 9200)) - .build() - .toExternalForm(); + .build(); + httpAddress = HttpAddress.http1(u); + this.url = u.toExternalForm(); + } else { + URL u = URL.http().host("localhost").port(9200).build(); + httpAddress = HttpAddress.http1(u); + this.url = u.toExternalForm(); } ServiceLoader httpActionServiceLoader = ServiceLoader.load(HttpAction.class, classLoader); for (HttpAction httpAction : httpActionServiceLoader) { @@ -73,13 +85,18 @@ public class HttpClientHelper { actionMap.put(httpAction.getActionInstance(), httpAction); } Client.Builder clientBuilder = Client.builder(); + if (settings.getAsBoolean("pool.enabled", true)) { + clientBuilder.addPoolNode(httpAddress) + .setPoolNodeConnectionLimit(settings.getAsInt("pool.limit", Runtime.getRuntime().availableProcessors())); + } if (settings.hasValue("debug")) { clientBuilder.enableDebug(); } this.nettyHttpClient = clientBuilder.build(); - logger.log(Level.DEBUG, "extended HTTP client initialized, settings = {}, url = {}, {} actions", - settings, url, actionMap.size()); - return this; + if (logger.isDebugEnabled()) { + logger.log(Level.DEBUG, "HTTP client initialized, pooled = {}, settings = {}, url = {}, {} actions", + nettyHttpClient.isPooled(), settings, url, actionMap.size()); + } } private static List getNamedXContents() { @@ -95,7 +112,9 @@ public class HttpClientHelper { } protected void closeClient(Settings settings) throws IOException { - nettyHttpClient.shutdownGracefully(); + if (closed.compareAndSet(false, true)) { + nettyHttpClient.shutdownGracefully(); + } } public @@ -124,7 +143,9 @@ public class HttpClientHelper { } try { HttpActionContext httpActionContext = new HttpActionContext(this, request, url); - logger.log(Level.DEBUG, "url = " + url); + if (logger.isTraceEnabled()) { + logger.log(Level.TRACE, "url = " + url); + } httpAction.execute(httpActionContext, listener); } catch (Exception e) { logger.error(e.getMessage(), e); diff --git a/elx-http/src/main/java/org/xbib/elx/http/package-info.java b/elx-http/src/main/java/org/xbib/elx/http/package-info.java deleted file mode 100644 index ef5876c..0000000 --- a/elx-http/src/main/java/org/xbib/elx/http/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Classes for Elasticsearch HTTP client. - */ -package org.xbib.elx.http; diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java index 648be9e..9b11bad 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/BulkClientTest.java @@ -2,16 +2,13 @@ package org.xbib.elx.http.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; -import org.xbib.elx.http.HttpAdminClient; -import org.xbib.elx.http.HttpAdminClientProvider; import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; @@ -22,16 +19,15 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class BulkClientTest { private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getSimpleName()); - private static final Long ACTIONS = 100L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10L; + private static final Long MAX_ACTIONS_PER_REQUEST = 1000L; private final TestExtension.Helper helper; @@ -40,143 +36,107 @@ class BulkClientTest { } @Test - void testSingleDoc() throws Exception { - final HttpBulkClient client = ClientBuilder.builder() + void testNewIndex() throws Exception { + try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) - .build(); - try { - client.newIndex("test"); - client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - client.flush(); - client.waitForResponses(30L, TimeUnit.SECONDS); - } finally { - assertEquals(1, client.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (client.getBulkController().getLastBulkError() != null) { - logger.error("error", client.getBulkController().getLastBulkError()); - } - assertNull(client.getBulkController().getLastBulkError()); - client.close(); + .put(Parameters.FLUSH_INTERVAL.getName(), TimeValue.timeValueSeconds(5)) + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); } } @Test - void testNewIndex() throws Exception { - final HttpBulkClient client = ClientBuilder.builder() + void testSingleDoc() throws Exception { + try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) - .build(); - client.newIndex("test"); - client.close(); - } - - @Test - void testMapping() throws Exception { - try (HttpAdminClient adminClient = ClientBuilder.builder() - .setAdminClientProvider(HttpAdminClientProvider.class) - .put(helper.getHttpSettings()) - .build(); - HttpBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(HttpBulkClientProvider.class) - .put(helper.getHttpSettings()) - .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject(); - bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(adminClient.getMapping("test").containsKey("properties")); + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "30s") + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.flush(); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); } } @Test void testRandomDocs() throws Exception { long numactions = ACTIONS; - final HttpBulkClient client = ClientBuilder.builder() + try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .build(); - try { - client.newIndex("test"); + .put(Parameters.FLUSH_INTERVAL.name(), "60s") + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - client.index("test", null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - client.flush(); - client.waitForResponses(30L, TimeUnit.SECONDS); - } finally { - assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (client.getBulkController().getLastBulkError() != null) { - logger.error("error", client.getBulkController().getLastBulkError()); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); } - assertNull(client.getBulkController().getLastBulkError()); - client.refreshIndex("test"); - assertEquals(numactions, client.getSearchableDocs("test")); - client.close(); + assertNull(bulkClient.getBulkController().getLastBulkError()); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } } @Test void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); - Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; + long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; - logger.info("maxthreads={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions); - final HttpBulkClient bulkClient = ClientBuilder.builder() + long timeout = 120L; + try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads * 2) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .build(); - try { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test", settings); - bulkClient.startBulk("test", 0, 1000); - logger.info("index created"); + .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setStartBulkRefreshSeconds(0); // disable refresh + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { executorService.execute(() -> { for (int i1 = 0; i1 < actions; i1++) { - bulkClient.index("test", null, false,"{ \"name\" : \"" + helper.randomString(32) + "\"}"); + bulkClient.index(indexDefinition, null, false, + "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } latch.countDown(); }); } - logger.info("waiting for latch..."); - if (latch.await(60L, TimeUnit.SECONDS)) { - logger.info("flush..."); - bulkClient.flush(); - bulkClient.waitForResponses(60L, TimeUnit.SECONDS); - logger.info("got all responses, executor service shutdown..."); + if (latch.await(timeout, TimeUnit.SECONDS)) { + bulkClient.waitForResponses(timeout, TimeUnit.SECONDS); executorService.shutdown(); - executorService.awaitTermination(60L, TimeUnit.SECONDS); - logger.info("pool is shut down"); + executorService.awaitTermination(timeout, TimeUnit.SECONDS); } else { - logger.warn("latch timeout"); + logger.error("latch timeout!"); } - bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); + bulkClient.refreshIndex(indexDefinition); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - } finally { if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); - bulkClient.close(); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java index 066ea7c..ab3b070 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/DuplicateIDTest.java @@ -4,7 +4,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; @@ -33,29 +35,26 @@ class DuplicateIDTest { @Test void testDuplicateDocIDs() throws Exception { long numactions = ACTIONS; - final HttpBulkClient client = ClientBuilder.builder() + try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .build(); - try { - client.newIndex("test"); + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - client.index("test", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - client.flush(); - client.waitForResponses(30L, TimeUnit.SECONDS); - client.refreshIndex("test"); - long hits = client.getSearchableDocs("test"); + bulkClient.waitForResponses(30L, TimeUnit.SECONDS); + bulkClient.refreshIndex(indexDefinition); + long hits = bulkClient.getSearchableDocs(indexDefinition); assertTrue(hits < ACTIONS); - } finally { - client.close(); - assertEquals(numactions, client.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (client.getBulkController().getLastBulkError() != null) { - logger.error("error", client.getBulkController().getLastBulkError()); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); } - assertNull(client.getBulkController().getLastBulkError()); + assertNull(bulkClient.getBulkController().getLastBulkError()); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java index fa25eed..ad1640b 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexPruneTest.java @@ -2,7 +2,6 @@ package org.xbib.elx.http.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.settings.Settings; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; @@ -47,32 +46,28 @@ class IndexPruneTest { .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) .build()) { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test1", settings); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test"); indexDefinition.setFullIndexName("test1"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test2", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test2"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test3", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test3"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test4", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test4"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - indexDefinition.setPrune(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); IndexRetention indexRetention = new DefaultIndexRetention(); indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); + indexDefinition.setPrune(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test1")); @@ -81,7 +76,9 @@ class IndexPruneTest { assertFalse(indexPruneResult.getDeletedIndices().contains("test4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test1", "test2", "test3", "test4")) { - list.add(adminClient.isIndexExists(index)); + IndexDefinition indexDefinition1 = new DefaultIndexDefinition(index, null); + indexDefinition1.setFullIndexName(index); + list.add(adminClient.isIndexExists(indexDefinition1)); } logger.info(list); assertFalse(list.get(0)); diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java index a56ea63..5917326 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/IndexShiftTest.java @@ -18,7 +18,9 @@ import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; import java.util.Arrays; +import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -38,57 +40,49 @@ class IndexShiftTest { @Test void testIndexShift() throws Exception { - final HttpAdminClient adminClient = ClientBuilder.builder() + try (HttpAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(HttpAdminClientProvider.class) .put(helper.getHttpSettings()) .build(); - final HttpBulkClient bulkClient = ClientBuilder.builder() + HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .build(); - try { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test1234", settings); + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setFullIndexName("test_shift"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test1234", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test"); - indexDefinition.setFullIndexName("test1234"); indexDefinition.setShift(true); - IndexShiftResult indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c")); + IndexShiftResult indexShiftResult = + adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null); assertTrue(indexShiftResult.getNewAliases().contains("a")); assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().isEmpty()); - Map aliases = adminClient.getAliases("test1234"); + Map aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); logger.log(Level.DEBUG, "aliases = " + aliases); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test")); - String resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null); - logger.log(Level.DEBUG, "resolved = " + resolved); - aliases = adminClient.getAliases(resolved); - logger.log(Level.DEBUG, "aliases = " + aliases); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); + Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); + aliases = resolved.isPresent() ? + adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("test")); - bulkClient.newIndex("test5678", settings); + indexDefinition.setFullIndexName("test_shift2"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test5678", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - indexDefinition.setFullIndexName("test5678"); indexDefinition.setShift(true); indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), (request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add() @@ -100,29 +94,25 @@ class IndexShiftTest { assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("c")); - aliases = adminClient.getAliases("test5678"); + aliases = adminClient.getAliases("test_shift2"); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null); - assertNotNull(resolved); - aliases = adminClient.getAliases(resolved); + resolved = adminClient.resolveAlias("test").stream().findFirst(); + aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - } finally { - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - adminClient.close(); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java index 819b592..7056fa8 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SearchTest.java @@ -9,7 +9,9 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; import org.xbib.elx.http.HttpBulkClient; import org.xbib.elx.http.HttpBulkClientProvider; @@ -37,39 +39,38 @@ class SearchTest { @Test void testDocStream() throws Exception { long numactions = ACTIONS; - final HttpBulkClient bulkClient = ClientBuilder.builder() + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + try (HttpBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(HttpBulkClientProvider.class) .put(helper.getHttpSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .build(); - try (bulkClient) { - bulkClient.newIndex("test"); + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .build()) { + bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); } - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); - } - assertNull(bulkClient.getBulkController().getLastBulkError()); try (HttpSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(HttpSearchClientProvider.class) .put(helper.getHttpSettings()) .build()) { Stream stream = searchClient.search(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); Stream ids = searchClient.getIds(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery())); final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> { diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java index fc05334..a07da2a 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/SmokeTest.java @@ -2,11 +2,15 @@ package org.xbib.elx.http.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.http.HttpAdminClient; import org.xbib.elx.http.HttpAdminClientProvider; import org.xbib.elx.http.HttpBulkClient; @@ -16,6 +20,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class SmokeTest { @@ -39,26 +44,22 @@ class SmokeTest { .put(helper.getHttpSettings()) .build()) { IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); - bulkClient.newIndex("test_smoke"); - bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.checkMapping("test_smoke"); - bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); - bulkClient.delete("test_smoke", "1"); - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.delete("test_smoke", "1"); - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.deleteIndex("test_smoke"); bulkClient.newIndex(indexDefinition); - bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.flush(); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.checkMapping(indexDefinition); + bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); + bulkClient.delete(indexDefinition, "1"); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete(indexDefinition, "1"); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.deleteIndex(indexDefinition); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); @@ -70,6 +71,17 @@ class SmokeTest { } assertNull(bulkClient.getBulkController().getLastBulkError()); adminClient.deleteIndex(indexDefinition); + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject(); + indexDefinition.setMappings(Strings.toString(builder)); + bulkClient.newIndex(indexDefinition); + assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); } } } diff --git a/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java b/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java index 8e8a202..a5c4641 100644 --- a/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java +++ b/elx-http/src/test/java/org/xbib/elx/http/test/TestExtension.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -184,15 +183,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return cluster; } - Settings getNodeSettings() { - return Settings.builder() - .put("cluster.name", getClusterName()) - .put("path.home", getHome()) - //.put("cluster.initial_master_nodes", ) - //.put("discovery.seed_hosts", "127.0.0.1:9300") - .build(); - } - Settings getHttpSettings() { return Settings.builder() .put("cluster.name", getClusterName()) @@ -221,9 +211,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private Node buildNode() { Settings nodeSettings = Settings.builder() - .put(getNodeSettings()) - .put("node.name", "1" ) - .put("path.data", getHome() + "/data-1") + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .put("node.max_local_storage_nodes", 2) + .put("node.master", true) + .put("node.data", true) + .put("node.name", "1") + .put("cluster.initial_master_nodes", "1") + .put("discovery.seed_hosts", "127.0.0.1:9300") .build(); List> plugins = Collections.singletonList(Netty4Plugin.class); this.node = new MockNode(nodeSettings, plugins); diff --git a/elx-http/src/test/resources/log4j2-test.xml b/elx-http/src/test/resources/log4j2-test.xml index 367d41a..0f90ef1 100644 --- a/elx-http/src/test/resources/log4j2-test.xml +++ b/elx-http/src/test/resources/log4j2-test.xml @@ -6,7 +6,7 @@ - + diff --git a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java index 03ad4bc..04ae279 100644 --- a/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java +++ b/elx-node/src/main/java/org/xbib/elx/node/NodeClientHelper.java @@ -84,10 +84,12 @@ public class NodeClientHelper { } private static boolean isPrivateSettings(String key) { - return key.equals(Parameters.MAX_ACTIONS_PER_REQUEST.name()) || - key.equals(Parameters.MAX_CONCURRENT_REQUESTS.name()) || - key.equals(Parameters.MAX_VOLUME_PER_REQUEST.name()) || - key.equals(Parameters.FLUSH_INTERVAL.name()); + for (Parameters p : Parameters.values()) { + if (key.equals(p.getName())) { + return true; + } + } + return false; } private static class BulkNode extends Node { diff --git a/elx-node/src/main/java/org/xbib/elx/node/package-info.java b/elx-node/src/main/java/org/xbib/elx/node/package-info.java deleted file mode 100644 index c2a9dfb..0000000 --- a/elx-node/src/main/java/org/xbib/elx/node/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Node client extensions. - */ -package org.xbib.elx.node; diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java index 3a750ff..0512935 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/BulkClientTest.java @@ -2,16 +2,12 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; -import org.xbib.elx.node.NodeAdminClient; -import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -22,16 +18,15 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class BulkClientTest { private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName()); - private static final Long ACTIONS = 10000L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10000L; + private static final Long MAX_ACTIONS_PER_REQUEST = 100L; private final TestExtension.Helper helper; @@ -39,17 +34,29 @@ class BulkClientTest { this.helper = helper; } + @Test + void testNewIndex() throws Exception { + try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) + .setBulkClientProvider(NodeBulkClientProvider.class) + .put(helper.getNodeSettings()) + .put(Parameters.FLUSH_INTERVAL.getName(), "5s") + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + } + } + @Test void testSingleDoc() throws Exception { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build()) { - bulkClient.newIndex("test"); - bulkClient.index("test", "doc1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.waitForResponses(30L, TimeUnit.SECONDS); assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { @@ -59,111 +66,73 @@ class BulkClientTest { } } - @Test - void testNewIndex() throws Exception { - try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) - .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) - .build()) { - bulkClient.newIndex("test"); - } - } - - @Test - void testMapping() throws Exception { - try (NodeAdminClient adminClient = ClientBuilder.builder(helper.client()) - .setAdminClientProvider(NodeAdminClientProvider.class) - .put(helper.getNodeSettings()) - .build(); - NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) - .setBulkClientProvider(NodeBulkClientProvider.class) - .put(helper.getNodeSettings()) - .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject(); - bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(adminClient.getMapping("test").containsKey("properties")); - } - } - @Test void testRandomDocs() throws Exception { long numactions = ACTIONS; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") .build()) { - bulkClient.newIndex("test"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); + bulkClient.stopBulk(indexDefinition); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } } @Test void testThreadedRandomDocs() throws Exception { int maxthreads = Runtime.getRuntime().availableProcessors(); - Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; + long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST; final long actions = ACTIONS; + long timeout = 120L; try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), maxActionsPerRequest) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) + .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), maxActionsPerRequest) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush .build()) { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test", settings); - bulkClient.startBulk("test", 0, 1000); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setStartBulkRefreshSeconds(0); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { executorService.execute(() -> { for (int i1 = 0; i1 < actions; i1++) { - bulkClient.index("test", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } latch.countDown(); }); } - logger.info("waiting for latch..."); - if (latch.await(30L, TimeUnit.SECONDS)) { - logger.info("flush..."); - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - logger.info("got all responses, executor service shutdown..."); + if (latch.await(timeout, TimeUnit.SECONDS)) { + bulkClient.waitForResponses(timeout, TimeUnit.SECONDS); executorService.shutdown(); - executorService.awaitTermination(30L, TimeUnit.SECONDS); - logger.info("pool is shut down"); + executorService.awaitTermination(timeout, TimeUnit.SECONDS); } else { - logger.warn("latch timeout"); + logger.error("latch timeout!"); } - bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); + bulkClient.refreshIndex(indexDefinition); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java index a6cfdc2..7ce77fc 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/DuplicateIDTest.java @@ -4,7 +4,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -36,17 +38,17 @@ class DuplicateIDTest { try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { - bulkClient.newIndex("test"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertTrue(bulkClient.getSearchableDocs("test") < ACTIONS); + bulkClient.refreshIndex(indexDefinition); + assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java index 85c2e2d..4c83b37 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -2,7 +2,6 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.settings.Settings; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; @@ -47,32 +46,30 @@ class IndexPruneTest { .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test_prune1", settings); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test_prune"); indexDefinition.setFullIndexName("test_prune1"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test_prune2", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune2"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test_prune3", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune3"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test_prune4", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune4"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - indexDefinition.setPrune(true); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); IndexRetention indexRetention = new DefaultIndexRetention(); + indexRetention.setDelta(2); + indexRetention.setMinToKeep(2); indexDefinition.setRetention(indexRetention); indexDefinition.setEnabled(true); + indexDefinition.setPrune(true); IndexPruneResult indexPruneResult = adminClient.pruneIndex(indexDefinition); logger.info("prune result = " + indexPruneResult); assertTrue(indexPruneResult.getDeletedIndices().contains("test_prune1")); @@ -81,7 +78,9 @@ class IndexPruneTest { assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - list.add(adminClient.isIndexExists(index)); + IndexDefinition indexDefinition1 = new DefaultIndexDefinition(index, null); + indexDefinition1.setFullIndexName(index); + list.add(adminClient.isIndexExists(indexDefinition1)); } logger.info(list); assertFalse(list.get(0)); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index d3538b3..69aaa1c 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -3,7 +3,6 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -17,10 +16,11 @@ import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; import java.util.Arrays; +import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -45,46 +45,42 @@ class IndexShiftTest { .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) .build()) { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test_shift", settings); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setFullIndexName("test_shift"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); indexDefinition.setIndex("test"); indexDefinition.setFullIndexName("test_shift"); indexDefinition.setShift(true); - IndexShiftResult indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c")); + IndexShiftResult indexShiftResult = + adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null); assertTrue(indexShiftResult.getNewAliases().contains("a")); assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().isEmpty()); - Map aliases = adminClient.getAliases("test_shift"); + Map aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test")); - String resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null); - assertEquals("test_shift", resolved); - aliases = adminClient.getAliases(resolved); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); + Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); + aliases = resolved.isPresent() ? + adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test")); - bulkClient.newIndex("test_shift2", settings); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); + indexDefinition.setFullIndexName("test_shift2"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift2", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - indexDefinition.setFullIndexName("test_shift2"); indexDefinition.setShift(true); indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), (request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add() @@ -96,15 +92,15 @@ class IndexShiftTest { assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("c")); - aliases = adminClient.getAliases("test_shift2"); + aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test").stream().findFirst().orElse(null); - aliases = adminClient.getAliases(resolved); + resolved = adminClient.resolveAlias("test").stream().findFirst(); + aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java index b71f214..81f06d7 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SearchTest.java @@ -9,7 +9,9 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; import org.xbib.elx.node.NodeBulkClient; import org.xbib.elx.node.NodeBulkClientProvider; @@ -38,20 +40,22 @@ class SearchTest { @Test void testDocStream() throws Exception { long numactions = ACTIONS; + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); try (NodeBulkClient bulkClient = ClientBuilder.builder(helper.client()) .setBulkClientProvider(NodeBulkClientProvider.class) .put(helper.getNodeSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .build()) { - bulkClient.newIndex("test"); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); + bulkClient.stopBulk(indexDefinition); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); @@ -62,23 +66,39 @@ class SearchTest { .setSearchClientProvider(NodeSearchClientProvider.class) .put(helper.getNodeSettings()) .build()) { + // test stream count Stream stream = searchClient.search(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + // test stream docs + stream = searchClient.search(qb -> qb + .setIndices(indexDefinition.getFullIndexName()) + .setQuery(QueryBuilders.matchAllQuery()), + TimeValue.timeValueMillis(10), 79); + final AtomicInteger hitcount = new AtomicInteger(); + stream.forEach(hit -> hitcount.incrementAndGet()); + assertEquals(numactions, hitcount.get()); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + // test stream doc ids Stream ids = searchClient.getIds(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery())); - final AtomicInteger idcount = new AtomicInteger(0); + final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> idcount.incrementAndGet()); assertEquals(numactions, idcount.get()); - assertEquals(275, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); - assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java index 998ad3a..883b8f2 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/SmokeTest.java @@ -2,11 +2,15 @@ package org.xbib.elx.node.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.common.ClientBuilder; import org.xbib.elx.api.IndexDefinition; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.node.NodeAdminClient; import org.xbib.elx.node.NodeAdminClientProvider; import org.xbib.elx.node.NodeBulkClient; @@ -16,6 +20,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class SmokeTest { @@ -39,26 +44,25 @@ class SmokeTest { .put(helper.getNodeSettings()) .build()) { IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + new DefaultIndexDefinition(adminClient, "test_smoke", "doc", Settings.EMPTY); + assertEquals("test_smoke", indexDefinition.getIndex()); + assertTrue(indexDefinition.getFullIndexName().startsWith("test_smoke")); assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); - bulkClient.newIndex("test_smoke"); - bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.checkMapping("test_smoke"); - bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); - bulkClient.delete("test_smoke", "1"); - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.delete("test_smoke", "1"); - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.deleteIndex("test_smoke"); + indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); - bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.flush(); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.checkMapping(indexDefinition); + bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); + bulkClient.delete(indexDefinition, "1"); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete(indexDefinition, "1"); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.deleteIndex(indexDefinition); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); @@ -70,6 +74,17 @@ class SmokeTest { } assertNull(bulkClient.getBulkController().getLastBulkError()); adminClient.deleteIndex(indexDefinition); + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject(); + indexDefinition.setMappings(Strings.toString(builder)); + bulkClient.newIndex(indexDefinition); + assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); } } } diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java index 102e6f8..51b865f 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/TestExtension.java @@ -184,6 +184,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft .put("cluster.name", getClusterName()) .put("path.home", getHome()) .put("node.max_local_storage_nodes", 2) + .put("cluster.initial_master_nodes", "1") + .put("discovery.seed_hosts", "127.0.0.1:9300") .build(); } @@ -206,8 +208,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private Node buildNode() { Settings nodeSettings = Settings.builder() - .put(getNodeSettings()) + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .put("node.max_local_storage_nodes", 2) + .put("node.master", true) + .put("node.data", true) .put("node.name", "1") + .put("cluster.initial_master_nodes", "1") + .put("discovery.seed_hosts", "127.0.0.1:9300") .build(); List> plugins = Collections.singletonList(Netty4Plugin.class); this.node = new MockNode(nodeSettings, plugins); diff --git a/elx-node/src/test/resources/log4j2-test.xml b/elx-node/src/test/resources/log4j2-test.xml index d2a79b4..bbe8a00 100644 --- a/elx-node/src/test/resources/log4j2-test.xml +++ b/elx-node/src/test/resources/log4j2-test.xml @@ -6,7 +6,7 @@ - + diff --git a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java index f7a0be2..555d31a 100644 --- a/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java +++ b/elx-transport/src/main/java/org/xbib/elx/transport/TransportClientHelper.java @@ -75,7 +75,7 @@ public class TransportClientHelper { Collection addrs = findAddresses(settings); if (!connect(transportClient, addrs, settings.getAsBoolean("autodiscover", false))) { throw new NoNodeAvailableException("no cluster nodes available, check settings " - + settings.toString()); + + Strings.toString(settings)); } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java index 45d2b07..98a684f 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/BulkClientTest.java @@ -2,17 +2,12 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; -import org.xbib.elx.transport.TransportAdminClient; -import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; @@ -23,16 +18,15 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class BulkClientTest { private static final Logger logger = LogManager.getLogger(BulkClientTest.class.getName()); - private static final Long ACTIONS = 10000L; + private static final Long ACTIONS = 100000L; - private static final Long MAX_ACTIONS_PER_REQUEST = 10000L; + private static final Long MAX_ACTIONS_PER_REQUEST = 100L; private final TestExtension.Helper helper; @@ -45,13 +39,13 @@ class BulkClientTest { final TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(30)) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "30s") .build(); try { - bulkClient.newIndex("test"); - bulkClient.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest bulkClient.waitForResponses(30L, TimeUnit.SECONDS); } finally { assertEquals(1, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); @@ -68,123 +62,82 @@ class BulkClientTest { final TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5)) + .put(Parameters.FLUSH_INTERVAL.getName(), "5s") .build(); - bulkClient.newIndex("test"); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); bulkClient.close(); } - @Test - void testMapping() throws Exception { - try (TransportAdminClient adminClient = ClientBuilder.builder() - .setAdminClientProvider(TransportAdminClientProvider.class) - .put(helper.getTransportSettings()) - .build(); - TransportBulkClient bulkClient = ClientBuilder.builder() - .setBulkClientProvider(TransportBulkClientProvider.class) - .put(helper.getTransportSettings()) - .build()) { - XContentBuilder builder = JsonXContent.contentBuilder() - .startObject() - .startObject("properties") - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject(); - bulkClient.newIndex("test", Settings.EMPTY, builder); - assertTrue(adminClient.getMapping("test").containsKey("properties")); - } - } - @Test void testRandomDocs() throws Exception { long numactions = ACTIONS; - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .build(); - try { - bulkClient.newIndex("test"); + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); - bulkClient.close(); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); } } @Test void testThreadedRandomDocs() throws Exception { - int maxthreads = Runtime.getRuntime().availableProcessors(); + final int maxthreads = Runtime.getRuntime().availableProcessors(); final long actions = ACTIONS; - final TransportBulkClient bulkClient = ClientBuilder.builder() + final long timeout = 120L; + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_CONCURRENT_REQUESTS.name(), maxthreads * 2) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60)) - .put(Parameters.ENABLE_BULK_LOGGING.name(), "true") - .build(); - try { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test", settings); - bulkClient.startBulk("test", 0, 1000); - logger.info("index created"); + .put(Parameters.MAX_CONCURRENT_REQUESTS.getName(), maxthreads) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.FLUSH_INTERVAL.getName(), "60s") // disable flush + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); ExecutorService executorService = Executors.newFixedThreadPool(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads); for (int i = 0; i < maxthreads; i++) { executorService.execute(() -> { for (int i1 = 0; i1 < actions; i1++) { - bulkClient.index("test", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } latch.countDown(); }); } - logger.info("waiting for latch..."); - if (latch.await(30L, TimeUnit.SECONDS)) { - logger.info("flush..."); - bulkClient.flush(); - bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - logger.info("got all responses, executor service shutdown..."); + if (latch.await(timeout, TimeUnit.SECONDS)) { + bulkClient.waitForResponses(timeout, TimeUnit.SECONDS); executorService.shutdown(); - executorService.awaitTermination(30L, TimeUnit.SECONDS); - logger.info("pool is shut down"); + executorService.awaitTermination(timeout, TimeUnit.SECONDS); } else { - logger.warn("latch timeout"); + logger.error("latch timeout!"); } - bulkClient.stopBulk("test", 30L, TimeUnit.SECONDS); + bulkClient.stopBulk(indexDefinition); + bulkClient.refreshIndex(indexDefinition); + assertEquals(maxthreads * actions, bulkClient.getSearchableDocs(indexDefinition)); assertEquals(maxthreads * actions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - } catch (NoNodeAvailableException e) { - logger.warn("skipping, no node available"); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } assertNull(bulkClient.getBulkController().getLastBulkError()); - bulkClient.refreshIndex("test"); - assertEquals(maxthreads * actions, bulkClient.getSearchableDocs("test")); - bulkClient.close(); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java index c2c8dae..4f9e6f5 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/DuplicateIDTest.java @@ -4,7 +4,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; @@ -33,23 +35,20 @@ class DuplicateIDTest { @Test void testDuplicateDocIDs() throws Exception { long numactions = ACTIONS; - final TransportBulkClient bulkClient = ClientBuilder.builder() + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) .put(helper.getTransportSettings()) - .build(); - try { - bulkClient.newIndex("test_dup"); + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test_dup", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test_dup"); - assertTrue(bulkClient.getSearchableDocs("test_dup") < ACTIONS); - } finally { - bulkClient.close(); + bulkClient.refreshIndex(indexDefinition); + assertTrue(bulkClient.getSearchableDocs(indexDefinition) < ACTIONS); assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java index f24a12a..0347fb9 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexPruneTest.java @@ -2,7 +2,6 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.settings.Settings; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; @@ -47,28 +46,24 @@ class IndexPruneTest { .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) .build()) { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); indexDefinition.setIndex("test_prune"); indexDefinition.setFullIndexName("test_prune1"); - bulkClient.newIndex("test_prune1", settings); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test_prune2", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune2"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test_prune3", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune3"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); - bulkClient.newIndex("test_prune4", settings); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setFullIndexName("test_prune4"); + bulkClient.newIndex(indexDefinition); indexDefinition.setShift(true); - adminClient.shiftIndex(indexDefinition, Collections.emptyList()); + adminClient.shiftIndex(indexDefinition, Collections.emptyList(), null); indexDefinition.setPrune(true); IndexRetention indexRetention = new DefaultIndexRetention(); indexDefinition.setRetention(indexRetention); @@ -81,7 +76,9 @@ class IndexPruneTest { assertFalse(indexPruneResult.getDeletedIndices().contains("test_prune4")); List list = new ArrayList<>(); for (String index : Arrays.asList("test_prune1", "test_prune2", "test_prune3", "test_prune4")) { - list.add(adminClient.isIndexExists(index)); + IndexDefinition indexDefinition1 = new DefaultIndexDefinition(index, null); + indexDefinition1.setFullIndexName(index); + list.add(adminClient.isIndexExists(indexDefinition1)); } logger.info(list); assertFalse(list.get(0)); diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java index 88f9b15..74f667a 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/IndexShiftTest.java @@ -3,7 +3,6 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -17,10 +16,11 @@ import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; import java.util.Arrays; +import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -37,55 +37,47 @@ class IndexShiftTest { @Test void testIndexShift() throws Exception { - final TransportAdminClient adminClient = ClientBuilder.builder() + try (final TransportAdminClient adminClient = ClientBuilder.builder() .setAdminClientProvider(TransportAdminClientProvider.class) .put(helper.getTransportSettings()) .build(); final TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .build(); - try { - Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build(); - bulkClient.newIndex("test_shift1234", settings); + .build()) { + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + indexDefinition.setFullIndexName("test_shift"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift1234", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - IndexDefinition indexDefinition = new DefaultIndexDefinition(); - indexDefinition.setIndex("test_shift"); - indexDefinition.setFullIndexName("test_shift1234"); indexDefinition.setShift(true); - IndexShiftResult indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c")); + IndexShiftResult indexShiftResult = + adminClient.shiftIndex(indexDefinition, Arrays.asList("a", "b", "c"), null); assertTrue(indexShiftResult.getNewAliases().contains("a")); assertTrue(indexShiftResult.getNewAliases().contains("b")); assertTrue(indexShiftResult.getNewAliases().contains("c")); assertTrue(indexShiftResult.getMovedAliases().isEmpty()); - Map aliases = adminClient.getAliases("test_shift1234"); + Map aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test_shift")); - String resolved = adminClient.resolveAlias("test_shift").stream().findFirst().orElse(null); - assertNotNull(resolved); - aliases = adminClient.getAliases(resolved); + assertTrue(aliases.containsKey(indexDefinition.getIndex())); + Optional resolved = adminClient.resolveAlias(indexDefinition.getIndex()).stream().findFirst(); + aliases = resolved.isPresent() ? + adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); - assertTrue(aliases.containsKey("test_shift")); - bulkClient.newIndex("test_shift5678", settings); + indexDefinition.setFullIndexName("test_shift2"); + bulkClient.newIndex(indexDefinition); for (int i = 0; i < 1; i++) { - bulkClient.index("test_shift5678", helper.randomString(1), false, + bulkClient.index(indexDefinition, helper.randomString(1), false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - indexDefinition.setFullIndexName("test_shift5678"); indexDefinition.setShift(true); indexShiftResult = adminClient.shiftIndex(indexDefinition, Arrays.asList("d", "e", "f"), (request, index, alias) -> request.addAliasAction(IndicesAliasesRequest.AliasActions.add() @@ -97,24 +89,21 @@ class IndexShiftTest { assertTrue(indexShiftResult.getMovedAliases().contains("a")); assertTrue(indexShiftResult.getMovedAliases().contains("b")); assertTrue(indexShiftResult.getMovedAliases().contains("c")); - aliases = adminClient.getAliases("test_shift5678"); + aliases = adminClient.getAliases(indexDefinition.getFullIndexName()); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - resolved = adminClient.resolveAlias("test_shift").stream().findFirst().orElse(null); - aliases = adminClient.getAliases(resolved); + resolved = adminClient.resolveAlias("test").stream().findFirst(); + aliases = resolved.isPresent() ? adminClient.getAliases(resolved.get()) : Collections.emptyMap(); assertTrue(aliases.containsKey("a")); assertTrue(aliases.containsKey("b")); assertTrue(aliases.containsKey("c")); assertTrue(aliases.containsKey("d")); assertTrue(aliases.containsKey("e")); assertTrue(aliases.containsKey("f")); - } finally { - adminClient.close(); - bulkClient.close(); if (bulkClient.getBulkController().getLastBulkError() != null) { logger.error("error", bulkClient.getBulkController().getLastBulkError()); } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java index d573ebd..3aafe38 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SearchTest.java @@ -4,20 +4,20 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.common.Parameters; import org.xbib.elx.transport.TransportBulkClient; import org.xbib.elx.transport.TransportBulkClientProvider; import org.xbib.elx.transport.TransportSearchClient; import org.xbib.elx.transport.TransportSearchClientProvider; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -40,48 +40,64 @@ class SearchTest { @Test void testDocStream() throws Exception { long numactions = ACTIONS; - final TransportBulkClient bulkClient = ClientBuilder.builder() + IndexDefinition indexDefinition = new DefaultIndexDefinition("test", "doc"); + try (TransportBulkClient bulkClient = ClientBuilder.builder() .setBulkClientProvider(TransportBulkClientProvider.class) .put(helper.getTransportSettings()) - .put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST) - .build(); - try (bulkClient) { - bulkClient.newIndex("test"); + .put(Parameters.MAX_ACTIONS_PER_REQUEST.getName(), MAX_ACTIONS_PER_REQUEST) + .build()) { + bulkClient.newIndex(indexDefinition); + bulkClient.startBulk(indexDefinition); for (int i = 0; i < ACTIONS; i++) { - bulkClient.index("test", null, false, + bulkClient.index(indexDefinition, null, false, "{ \"name\" : \"" + helper.randomString(32) + "\"}"); } - bulkClient.flush(); bulkClient.waitForResponses(30L, TimeUnit.SECONDS); - bulkClient.refreshIndex("test"); - assertEquals(numactions, bulkClient.getSearchableDocs("test")); + bulkClient.refreshIndex(indexDefinition); + assertEquals(numactions, bulkClient.getSearchableDocs(indexDefinition)); + assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); + if (bulkClient.getBulkController().getLastBulkError() != null) { + logger.error("error", bulkClient.getBulkController().getLastBulkError()); + } + assertNull(bulkClient.getBulkController().getLastBulkError()); } - assertEquals(numactions, bulkClient.getBulkController().getBulkMetric().getSucceeded().getCount()); - if (bulkClient.getBulkController().getLastBulkError() != null) { - logger.error("error", bulkClient.getBulkController().getLastBulkError()); - } - assertNull(bulkClient.getBulkController().getLastBulkError()); try (TransportSearchClient searchClient = ClientBuilder.builder() .setSearchClientProvider(TransportSearchClientProvider.class) .put(helper.getTransportSettings()) .build()) { + // test stream count Stream stream = searchClient.search(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery()), TimeValue.timeValueMillis(100), 579); long count = stream.count(); assertEquals(numactions, count); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(1L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + // test stream docs + stream = searchClient.search(qb -> qb + .setIndices(indexDefinition.getFullIndexName()) + .setQuery(QueryBuilders.matchAllQuery()), + TimeValue.timeValueMillis(10), 79); + final AtomicInteger hitcount = new AtomicInteger(); + stream.forEach(hit -> hitcount.incrementAndGet()); + assertEquals(numactions, hitcount.get()); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(2L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + // test stream doc ids Stream ids = searchClient.getIds(qb -> qb - .setIndices("test") + .setIndices(indexDefinition.getFullIndexName()) .setQuery(QueryBuilders.matchAllQuery())); - final AtomicInteger idcount = new AtomicInteger(0); + final AtomicInteger idcount = new AtomicInteger(); ids.forEach(id -> idcount.incrementAndGet()); assertEquals(numactions, idcount.get()); - assertEquals(275, searchClient.getSearchMetric().getQueries().getCount()); - assertEquals(273, searchClient.getSearchMetric().getSucceededQueries().getCount()); - assertEquals(2, searchClient.getSearchMetric().getEmptyQueries().getCount()); - assertEquals(0, searchClient.getSearchMetric().getFailedQueries().getCount()); - assertEquals(0, searchClient.getSearchMetric().getTimeoutQueries().getCount()); + assertEquals(1542L, searchClient.getSearchMetric().getQueries().getCount()); + assertEquals(1539L, searchClient.getSearchMetric().getSucceededQueries().getCount()); + assertEquals(3L, searchClient.getSearchMetric().getEmptyQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getFailedQueries().getCount()); + assertEquals(0L, searchClient.getSearchMetric().getTimeoutQueries().getCount()); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java index 53e5758..10c94ba 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/SmokeTest.java @@ -2,11 +2,15 @@ package org.xbib.elx.transport.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.xbib.elx.api.IndexDefinition; import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.common.DefaultIndexDefinition; import org.xbib.elx.transport.TransportAdminClient; import org.xbib.elx.transport.TransportAdminClientProvider; import org.xbib.elx.transport.TransportBulkClient; @@ -16,6 +20,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(TestExtension.class) class SmokeTest { @@ -39,26 +44,25 @@ class SmokeTest { .put(helper.getTransportSettings()) .build()) { IndexDefinition indexDefinition = - adminClient.buildIndexDefinitionFromSettings("test_smoke", Settings.EMPTY); + new DefaultIndexDefinition(adminClient, "test", "doc", Settings.EMPTY); + assertEquals("test", indexDefinition.getIndex()); + assertTrue(indexDefinition.getFullIndexName().startsWith("test")); assertEquals(0, indexDefinition.getReplicaLevel()); assertEquals(helper.getClusterName(), adminClient.getClusterName()); - bulkClient.newIndex("test_smoke"); - bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.checkMapping("test_smoke"); - bulkClient.update("test_smoke", "1", "{ \"name\" : \"Another name\"}"); - bulkClient.delete("test_smoke", "1"); - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - bulkClient.index("test_smoke", "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.delete("test_smoke", "1"); - bulkClient.flush(); - bulkClient.waitForResponses(30, TimeUnit.SECONDS); - adminClient.deleteIndex("test_smoke"); + indexDefinition.setType("doc"); bulkClient.newIndex(indexDefinition); - bulkClient.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}"); - bulkClient.flush(); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.checkMapping(indexDefinition); + bulkClient.update(indexDefinition, "1", "{ \"name\" : \"Another name\"}"); + bulkClient.delete(indexDefinition, "1"); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); + bulkClient.delete(indexDefinition, "1"); + bulkClient.waitForResponses(30, TimeUnit.SECONDS); + adminClient.deleteIndex(indexDefinition); + bulkClient.newIndex(indexDefinition); + bulkClient.index(indexDefinition, "1", true, "{ \"name\" : \"Hello World\"}"); bulkClient.waitForResponses(30, TimeUnit.SECONDS); adminClient.updateReplicaLevel(indexDefinition, 2); int replica = adminClient.getReplicaLevel(indexDefinition); @@ -70,6 +74,17 @@ class SmokeTest { } assertNull(bulkClient.getBulkController().getLastBulkError()); adminClient.deleteIndex(indexDefinition); + XContentBuilder builder = JsonXContent.contentBuilder() + .startObject() + .startObject("properties") + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject(); + indexDefinition.setMappings(Strings.toString(builder)); + bulkClient.newIndex(indexDefinition); + assertTrue(adminClient.getMapping(indexDefinition).containsKey("properties")); } } } diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java index 4139001..f8a4733 100644 --- a/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java +++ b/elx-transport/src/test/java/org/xbib/elx/transport/test/TestExtension.java @@ -176,15 +176,6 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft return cluster; } - Settings getNodeSettings() { - return Settings.builder() - .put("cluster.name", getClusterName()) - .put("path.home", getHome()) - .put("cluster.initial_master_nodes", "1") - .put("discovery.seed_hosts", "127.0.0.1:9300") - .build(); - } - Settings getTransportSettings() { return Settings.builder() .put("cluster.name", cluster) @@ -213,8 +204,14 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft private Node buildNode() { Settings nodeSettings = Settings.builder() - .put(getNodeSettings()) + .put("cluster.name", getClusterName()) + .put("path.home", getHome()) + .put("node.max_local_storage_nodes", 2) + .put("node.master", true) + .put("node.data", true) .put("node.name", "1") + .put("cluster.initial_master_nodes", "1") + .put("discovery.seed_hosts", "127.0.0.1:9300") .build(); List> plugins = Collections.singletonList(Netty4Plugin.class); this.node = new MockNode(nodeSettings, plugins); diff --git a/elx-transport/src/test/resources/log4j2-test.xml b/elx-transport/src/test/resources/log4j2-test.xml index d2a79b4..0f90ef1 100644 --- a/elx-transport/src/test/resources/log4j2-test.xml +++ b/elx-transport/src/test/resources/log4j2-test.xml @@ -6,7 +6,7 @@ - + @@ -14,4 +14,4 @@ - \ No newline at end of file + diff --git a/gradle.properties b/gradle.properties index e29acd9..e2be6c6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,11 +4,11 @@ version = 7.10.2.0 gradle.wrapper.version = 6.6.1 xbib-metrics.version = 2.1.0 -xbib-netty-http.version = 4.1.60.0 +xbib-netty-http.version = 4.1.63.0 elasticsearch.version = 7.10.2 # ES 7.10.2 uses Jackson 2.10.4 jackson.version = 2.12.1 -netty.version = 4.1.60.Final -tcnative.version = 2.0.36.Final +netty.version = 4.1.63.Final +tcnative.version = 2.0.38.Final bouncycastle.version = 1.64 log4j.version = 2.14.0 diff --git a/gradle/test/junit5.gradle b/gradle/test/junit5.gradle index 9654bab..9ee64d3 100644 --- a/gradle/test/junit5.gradle +++ b/gradle/test/junit5.gradle @@ -1,5 +1,5 @@ -def junitVersion = project.hasProperty('junit.version')?project.property('junit.version'):'5.6.2' +def junitVersion = project.hasProperty('junit.version')?project.property('junit.version'):'5.7.0' def hamcrestVersion = project.hasProperty('hamcrest.version')?project.property('hamcrest.version'):'2.2' dependencies { @@ -13,6 +13,28 @@ test { useJUnitPlatform() // for Lucene to access jdk.internal.ref and jdk.internal.misc in Java 11+ jvmArgs = [ + // gradle default of 512m is too less for ES bulk indexing + '-Xmx2g', + '-Xms2g', + // do we need tricks with G1GC and real memory circuit breaker? + /*'-XX:+UseG1GC', + '-XX:+ParallelRefProcEnabled', + '-XX:MaxGCPauseMillis=50', + '-XX:+UnlockExperimentalVMOptions', + '-XX:+DisableExplicitGC', + '-XX:+AlwaysPreTouch', + '-XX:G1NewSizePercent=30', + '-XX:G1MaxNewSizePercent=40', + '-XX:G1HeapRegionSize=8M', + '-XX:G1ReservePercent=20', + '-XX:G1HeapWastePercent=5', + '-XX:G1MixedGCCountTarget=4', + '-XX:InitiatingHeapOccupancyPercent=15', + '-XX:G1MixedGCLiveThresholdPercent=90', + '-XX:G1RSetUpdatingPauseTimePercent=5', + '-XX:SurvivorRatio=32', + '-XX:+PerfDisableSharedMem', + '-XX:MaxTenuringThreshold=1',*/ '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED', '--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED', '--add-opens=java.base/java.nio=ALL-UNNAMED'