diff --git a/build.gradle b/build.gradle index 090181a..2e44f5f 100644 --- a/build.gradle +++ b/build.gradle @@ -41,11 +41,19 @@ subprojects { testCompile "junit:junit:${project.property('junit.version')}" testCompile "org.apache.logging.log4j:log4j-core:${project.property('log4j.version')}" testCompile "org.apache.logging.log4j:log4j-slf4j-impl:${project.property('log4j.version')}" + asciidoclet "org.xbib:asciidoclet:${project.property('asciidoclet.version')}" wagon "org.apache.maven.wagon:wagon-ssh:${project.property('wagon.version')}" } - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + compileJava { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + } + + compileTestJava { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + } tasks.withType(JavaCompile) { options.compilerArgs << "-Xlint:all" @@ -57,6 +65,7 @@ subprojects { test { jvmArgs =[ '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED', + '--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED', '--add-opens=java.base/java.nio=ALL-UNNAMED' ] systemProperty 'jna.debug_load', 'true' @@ -78,7 +87,7 @@ subprojects { options.overview = "src/docs/asciidoclet/overview.adoc" options.addStringOption "-base-dir", "${projectDir}" options.addStringOption "-attribute", - "name=${project.name},version=${project.version},title-link=https://github.com/xbib/${project.name}" + "name=${project.name},version=${project.version},title-link=https://github.com/jprante/${project.name}" configure(options) { noTimestamp = true } diff --git a/elx-api/build.gradle b/elx-api/build.gradle index 9e7b7be..47596cb 100644 --- a/elx-api/build.gradle +++ b/elx-api/build.gradle @@ -1,5 +1,5 @@ dependencies { - compile "org.xbib:metrics:${project.property('xbib-metrics.version')}" + compile "org.xbib:metrics-common:${project.property('xbib-metrics.version')}" compile("org.elasticsearch:elasticsearch:${project.property('elasticsearch.version')}") { // exclude ES jackson yaml, cbor, smile versions exclude group: 'com.fasterxml.jackson.dataformat' diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index 3a406fb..af825e5 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -1,8 +1,8 @@ package org.xbib.elx.api; import org.elasticsearch.common.settings.Settings; -import org.xbib.metrics.Count; -import org.xbib.metrics.Metered; +import org.xbib.metrics.api.Count; +import org.xbib.metrics.api.Metered; import java.io.Closeable; diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index 2703acb..52b6d04 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -10,10 +10,10 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { - @SuppressWarnings("rawtype") + @SuppressWarnings("rawtypes") BulkProcessor add(ActionRequest request); - @SuppressWarnings("rawtype") + @SuppressWarnings("rawtypes") BulkProcessor add(ActionRequest request, Object payload); boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; diff --git a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java index 5ea81ae..4b97f4a 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java +++ b/elx-common/src/main/java/org/xbib/elx/common/AbstractExtendedClient.java @@ -309,7 +309,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { } @Override - public ExtendedClient newIndex(String index, Settings settings, Map mapping) throws IOException { + public ExtendedClient newIndex(String index, Settings settings, Map mapping) { ensureActive(); if (index == null) { logger.warn("no index name given to create index"); @@ -378,18 +378,18 @@ public abstract class AbstractExtendedClient implements ExtendedClient { return this; } - @Override - public ExtendedClient index(String index, String id, boolean create, BytesReference source) { - return index(new IndexRequest(index, TYPE_NAME, id).create(create) - .source(source)); - } - @Override public ExtendedClient index(String index, String id, boolean create, String source) { return index(new IndexRequest(index, TYPE_NAME, id).create(create) .source(source.getBytes(StandardCharsets.UTF_8))); } + @Override + public ExtendedClient index(String index, String id, boolean create, BytesReference source) { + return index(new IndexRequest(index, TYPE_NAME, id).create(create) + .source(source)); + } + @Override public ExtendedClient index(IndexRequest indexRequest) { ensureActive(); @@ -411,12 +411,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient { @Override public ExtendedClient update(String index, String id, BytesReference source) { - return update(new UpdateRequest(index, TYPE_NAME, id).doc(source)); + return update(new UpdateRequest(index, TYPE_NAME, id) + .doc(source)); } @Override public ExtendedClient update(String index, String id, String source) { - return update(new UpdateRequest(index, TYPE_NAME, id).doc(source.getBytes(StandardCharsets.UTF_8))); + return update(new UpdateRequest(index, TYPE_NAME, id) + .doc(source.getBytes(StandardCharsets.UTF_8))); } @Override @@ -439,18 +441,22 @@ public abstract class AbstractExtendedClient implements ExtendedClient { RecoveryRequest recoveryRequest = new RecoveryRequest(); recoveryRequest.indices(index); recoveryRequest.activeOnly(true); - RecoveryResponse response = client.execute(RecoveryAction.INSTANCE, recoveryRequest).actionGet(); - int shards = response.getTotalShards(); - TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); - ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() - .indices(new String[]{index}) - .waitForActiveShards(shards) - .timeout(timeout); - ClusterHealthResponse healthResponse = - client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); - if (healthResponse != null && healthResponse.isTimedOut()) { - logger.error("timeout waiting for recovery"); - return false; + RecoveryResponse recoveryResponse = client.execute(RecoveryAction.INSTANCE, recoveryRequest).actionGet(); + if (recoveryResponse.hasRecoveries()) { + int shards = recoveryResponse.getTotalShards(); + logger.info("shards = {}", shards); + logger.info(recoveryResponse.shardRecoveryStates()); + TimeValue timeout = toTimeValue(maxWaitTime, timeUnit); + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest() + .indices(new String[]{index}) + .waitForActiveShards(shards) + .timeout(timeout); + ClusterHealthResponse healthResponse = + client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet(); + if (healthResponse != null && healthResponse.isTimedOut()) { + logger.error("timeout waiting for recovery"); + return false; + } } return true; } @@ -641,10 +647,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient { oldIndex, alias)); if (filter != null) { indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, - fullIndexName, index).filter(filter)); + fullIndexName, alias).filter(filter)); } else { indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, - fullIndexName, index)); + fullIndexName, alias)); } moveAliases.add(alias); } @@ -733,7 +739,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient { if (m2.matches()) { Integer i2 = Integer.parseInt(m2.group(2)); int kept = candidateIndices.size() - indicesToDelete.size(); - if ((delta == 0 || (delta > 0 && i1 - i2 > delta)) && mintokeep <= kept) { + if ((delta == 0 || (delta > 0 && i1 - i2 >= delta)) && mintokeep <= kept) { indicesToDelete.add(s); } } diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java index a956c4d..8127e29 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkMetric.java @@ -2,10 +2,10 @@ package org.xbib.elx.common; import org.elasticsearch.common.settings.Settings; import org.xbib.elx.api.BulkMetric; -import org.xbib.metrics.Count; -import org.xbib.metrics.CountMetric; -import org.xbib.metrics.Meter; -import org.xbib.metrics.Metered; +import org.xbib.metrics.api.Count; +import org.xbib.metrics.api.Metered; +import org.xbib.metrics.common.CountMetric; +import org.xbib.metrics.common.Meter; import java.util.concurrent.Executors; diff --git a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java index 224f507..436d375 100644 --- a/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java +++ b/elx-common/src/main/java/org/xbib/elx/common/DefaultBulkProcessor.java @@ -132,6 +132,7 @@ public class DefaultBulkProcessor implements BulkProcessor { * @param request request * @return his bulk processor */ + @SuppressWarnings("rawtypes") @Override public DefaultBulkProcessor add(ActionRequest request) { return add(request, null); @@ -144,6 +145,7 @@ public class DefaultBulkProcessor implements BulkProcessor { * @param payload payload * @return his bulk processor */ + @SuppressWarnings("rawtypes") @Override public DefaultBulkProcessor add(ActionRequest request, Object payload) { internalAdd(request, payload); diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java new file mode 100644 index 0000000..b0b6428 --- /dev/null +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexPruneTest.java @@ -0,0 +1,80 @@ +package org.xbib.elx.node.test; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.common.settings.Settings; +import org.junit.Test; +import org.xbib.elx.api.IndexPruneResult; +import org.xbib.elx.common.ClientBuilder; +import org.xbib.elx.node.ExtendedNodeClient; +import org.xbib.elx.node.ExtendedNodeClientProvider; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class IndexPruneTest extends TestBase { + + private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getName()); + + @Test + public void testPrune() throws IOException { + final ExtendedNodeClient client = ClientBuilder.builder(client("1")) + .provider(ExtendedNodeClientProvider.class) + .build(); + try { + Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build(); + client.newIndex("test1", settings); + client.shiftIndex("test", "test1", Collections.emptyList()); + client.newIndex("test2", settings); + client.shiftIndex("test", "test2", Collections.emptyList()); + client.newIndex("test3", settings); + client.shiftIndex("test", "test3", Collections.emptyList()); + client.newIndex("test4", settings); + client.shiftIndex("test", "test4", Collections.emptyList()); + + IndexPruneResult indexPruneResult = + client.pruneIndex("test", "test4", 2, 2, true); + + assertTrue(indexPruneResult.getDeletedIndices().contains("test1")); + assertTrue(indexPruneResult.getDeletedIndices().contains("test2")); + assertFalse(indexPruneResult.getDeletedIndices().contains("test3")); + assertFalse(indexPruneResult.getDeletedIndices().contains("test4")); + + List list = new ArrayList<>(); + for (String index : Arrays.asList("test1", "test2", "test3", "test4")) { + IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); + indicesExistsRequest.indices(new String[] { index }); + IndicesExistsResponse indicesExistsResponse = + client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); + list.add(indicesExistsResponse.isExists()); + } + logger.info(list); + assertFalse(list.get(0)); + assertFalse(list.get(1)); + assertTrue(list.get(2)); + assertTrue(list.get(3)); + } catch (NoNodeAvailableException e) { + logger.warn("skipping, no node available"); + } finally { + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); + } + assertNull(client.getBulkController().getLastBulkError()); + } + } +} diff --git a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java index 8b31d89..6c900e8 100644 --- a/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java +++ b/elx-node/src/test/java/org/xbib/elx/node/test/IndexShiftTest.java @@ -100,7 +100,6 @@ public class IndexShiftTest extends TestBase { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.waitForResponses(30L, TimeUnit.SECONDS); client.close(); if (client.getBulkController().getLastBulkError() != null) { logger.error("error", client.getBulkController().getLastBulkError()); diff --git a/elx-node/src/test/resources/log4j2.xml b/elx-node/src/test/resources/log4j2.xml index 1258d7f..6c323f8 100644 --- a/elx-node/src/test/resources/log4j2.xml +++ b/elx-node/src/test/resources/log4j2.xml @@ -6,7 +6,7 @@ - + diff --git a/elx-transport/src/test/java/org/xbib/elx/transport/IndexPruneTest.java b/elx-transport/src/test/java/org/xbib/elx/transport/IndexPruneTest.java new file mode 100644 index 0000000..4ce1843 --- /dev/null +++ b/elx-transport/src/test/java/org/xbib/elx/transport/IndexPruneTest.java @@ -0,0 +1,79 @@ +package org.xbib.elx.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.common.settings.Settings; +import org.junit.Test; +import org.xbib.elx.api.IndexPruneResult; +import org.xbib.elx.common.ClientBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class IndexPruneTest extends TestBase { + + private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getName()); + + @Test + public void testPrune() throws IOException { + final ExtendedTransportClient client = ClientBuilder.builder() + .provider(ExtendedTransportClientProvider.class) + .put(getTransportSettings()) + .build(); + try { + Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build(); + client.newIndex("test1", settings); + client.shiftIndex("test", "test1", Collections.emptyList()); + client.newIndex("test2", settings); + client.shiftIndex("test", "test2", Collections.emptyList()); + client.newIndex("test3", settings); + client.shiftIndex("test", "test3", Collections.emptyList()); + client.newIndex("test4", settings); + client.shiftIndex("test", "test4", Collections.emptyList()); + + IndexPruneResult indexPruneResult = + client.pruneIndex("test", "test4", 2, 2, true); + + assertTrue(indexPruneResult.getDeletedIndices().contains("test1")); + assertTrue(indexPruneResult.getDeletedIndices().contains("test2")); + assertFalse(indexPruneResult.getDeletedIndices().contains("test3")); + assertFalse(indexPruneResult.getDeletedIndices().contains("test4")); + + List list = new ArrayList<>(); + for (String index : Arrays.asList("test1", "test2", "test3", "test4")) { + IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); + indicesExistsRequest.indices(new String[] { index }); + IndicesExistsResponse indicesExistsResponse = + client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet(); + list.add(indicesExistsResponse.isExists()); + } + logger.info(list); + assertFalse(list.get(0)); + assertFalse(list.get(1)); + assertTrue(list.get(2)); + assertTrue(list.get(3)); + } catch (NoNodeAvailableException e) { + logger.warn("skipping, no node available"); + } finally { + client.close(); + if (client.getBulkController().getLastBulkError() != null) { + logger.error("error", client.getBulkController().getLastBulkError()); + } + assertNull(client.getBulkController().getLastBulkError()); + } + } +} diff --git a/elx-transport/src/test/resources/log4j2.xml b/elx-transport/src/test/resources/log4j2.xml index 1258d7f..6c323f8 100644 --- a/elx-transport/src/test/resources/log4j2.xml +++ b/elx-transport/src/test/resources/log4j2.xml @@ -6,7 +6,7 @@ - + diff --git a/gradle.properties b/gradle.properties index 7317e63..ea165e1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ group = org.xbib name = elx version = 2.2.1.6 -xbib-metrics.version = 1.1.0 +xbib-metrics.version = 1.2.0 xbib-guice.version = 4.0.4 elasticsearch.version = 2.2.1