metric 1.2.0, fix shift index, add prune index test
This commit is contained in:
parent
fcd99f801d
commit
22ab37b3e5
13 changed files with 215 additions and 40 deletions
15
build.gradle
15
build.gradle
|
@ -41,11 +41,19 @@ subprojects {
|
|||
testCompile "junit:junit:${project.property('junit.version')}"
|
||||
testCompile "org.apache.logging.log4j:log4j-core:${project.property('log4j.version')}"
|
||||
testCompile "org.apache.logging.log4j:log4j-slf4j-impl:${project.property('log4j.version')}"
|
||||
asciidoclet "org.xbib:asciidoclet:${project.property('asciidoclet.version')}"
|
||||
wagon "org.apache.maven.wagon:wagon-ssh:${project.property('wagon.version')}"
|
||||
}
|
||||
|
||||
sourceCompatibility = JavaVersion.VERSION_1_8
|
||||
targetCompatibility = JavaVersion.VERSION_1_8
|
||||
compileJava {
|
||||
sourceCompatibility = JavaVersion.VERSION_1_8
|
||||
targetCompatibility = JavaVersion.VERSION_1_8
|
||||
}
|
||||
|
||||
compileTestJava {
|
||||
sourceCompatibility = JavaVersion.VERSION_1_8
|
||||
targetCompatibility = JavaVersion.VERSION_1_8
|
||||
}
|
||||
|
||||
tasks.withType(JavaCompile) {
|
||||
options.compilerArgs << "-Xlint:all"
|
||||
|
@ -57,6 +65,7 @@ subprojects {
|
|||
test {
|
||||
jvmArgs =[
|
||||
'--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
|
||||
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
|
||||
'--add-opens=java.base/java.nio=ALL-UNNAMED'
|
||||
]
|
||||
systemProperty 'jna.debug_load', 'true'
|
||||
|
@ -78,7 +87,7 @@ subprojects {
|
|||
options.overview = "src/docs/asciidoclet/overview.adoc"
|
||||
options.addStringOption "-base-dir", "${projectDir}"
|
||||
options.addStringOption "-attribute",
|
||||
"name=${project.name},version=${project.version},title-link=https://github.com/xbib/${project.name}"
|
||||
"name=${project.name},version=${project.version},title-link=https://github.com/jprante/${project.name}"
|
||||
configure(options) {
|
||||
noTimestamp = true
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
dependencies {
|
||||
compile "org.xbib:metrics:${project.property('xbib-metrics.version')}"
|
||||
compile "org.xbib:metrics-common:${project.property('xbib-metrics.version')}"
|
||||
compile("org.elasticsearch:elasticsearch:${project.property('elasticsearch.version')}") {
|
||||
// exclude ES jackson yaml, cbor, smile versions
|
||||
exclude group: 'com.fasterxml.jackson.dataformat'
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package org.xbib.elx.api;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.metrics.Count;
|
||||
import org.xbib.metrics.Metered;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
|
|
|
@ -10,10 +10,10 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public interface BulkProcessor extends Closeable, Flushable {
|
||||
|
||||
@SuppressWarnings("rawtype")
|
||||
@SuppressWarnings("rawtypes")
|
||||
BulkProcessor add(ActionRequest request);
|
||||
|
||||
@SuppressWarnings("rawtype")
|
||||
@SuppressWarnings("rawtypes")
|
||||
BulkProcessor add(ActionRequest request, Object payload);
|
||||
|
||||
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
|
||||
|
|
|
@ -309,7 +309,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException {
|
||||
public ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) {
|
||||
ensureActive();
|
||||
if (index == null) {
|
||||
logger.warn("no index name given to create index");
|
||||
|
@ -378,18 +378,18 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient index(String index, String id, boolean create, BytesReference source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
|
||||
.source(source));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient index(String index, String id, boolean create, String source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
|
||||
.source(source.getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient index(String index, String id, boolean create, BytesReference source) {
|
||||
return index(new IndexRequest(index, TYPE_NAME, id).create(create)
|
||||
.source(source));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient index(IndexRequest indexRequest) {
|
||||
ensureActive();
|
||||
|
@ -411,12 +411,14 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
|
|||
|
||||
@Override
|
||||
public ExtendedClient update(String index, String id, BytesReference source) {
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id).doc(source));
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id)
|
||||
.doc(source));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedClient update(String index, String id, String source) {
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id).doc(source.getBytes(StandardCharsets.UTF_8)));
|
||||
return update(new UpdateRequest(index, TYPE_NAME, id)
|
||||
.doc(source.getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -439,18 +441,22 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
|
|||
RecoveryRequest recoveryRequest = new RecoveryRequest();
|
||||
recoveryRequest.indices(index);
|
||||
recoveryRequest.activeOnly(true);
|
||||
RecoveryResponse response = client.execute(RecoveryAction.INSTANCE, recoveryRequest).actionGet();
|
||||
int shards = response.getTotalShards();
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
.indices(new String[]{index})
|
||||
.waitForActiveShards(shards)
|
||||
.timeout(timeout);
|
||||
ClusterHealthResponse healthResponse =
|
||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
logger.error("timeout waiting for recovery");
|
||||
return false;
|
||||
RecoveryResponse recoveryResponse = client.execute(RecoveryAction.INSTANCE, recoveryRequest).actionGet();
|
||||
if (recoveryResponse.hasRecoveries()) {
|
||||
int shards = recoveryResponse.getTotalShards();
|
||||
logger.info("shards = {}", shards);
|
||||
logger.info(recoveryResponse.shardRecoveryStates());
|
||||
TimeValue timeout = toTimeValue(maxWaitTime, timeUnit);
|
||||
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest()
|
||||
.indices(new String[]{index})
|
||||
.waitForActiveShards(shards)
|
||||
.timeout(timeout);
|
||||
ClusterHealthResponse healthResponse =
|
||||
client.execute(ClusterHealthAction.INSTANCE, clusterHealthRequest).actionGet();
|
||||
if (healthResponse != null && healthResponse.isTimedOut()) {
|
||||
logger.error("timeout waiting for recovery");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -641,10 +647,10 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
|
|||
oldIndex, alias));
|
||||
if (filter != null) {
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, index).filter(filter));
|
||||
fullIndexName, alias).filter(filter));
|
||||
} else {
|
||||
indicesAliasesRequest.addAliasAction(new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD,
|
||||
fullIndexName, index));
|
||||
fullIndexName, alias));
|
||||
}
|
||||
moveAliases.add(alias);
|
||||
}
|
||||
|
@ -733,7 +739,7 @@ public abstract class AbstractExtendedClient implements ExtendedClient {
|
|||
if (m2.matches()) {
|
||||
Integer i2 = Integer.parseInt(m2.group(2));
|
||||
int kept = candidateIndices.size() - indicesToDelete.size();
|
||||
if ((delta == 0 || (delta > 0 && i1 - i2 > delta)) && mintokeep <= kept) {
|
||||
if ((delta == 0 || (delta > 0 && i1 - i2 >= delta)) && mintokeep <= kept) {
|
||||
indicesToDelete.add(s);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,10 +2,10 @@ package org.xbib.elx.common;
|
|||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.xbib.elx.api.BulkMetric;
|
||||
import org.xbib.metrics.Count;
|
||||
import org.xbib.metrics.CountMetric;
|
||||
import org.xbib.metrics.Meter;
|
||||
import org.xbib.metrics.Metered;
|
||||
import org.xbib.metrics.api.Count;
|
||||
import org.xbib.metrics.api.Metered;
|
||||
import org.xbib.metrics.common.CountMetric;
|
||||
import org.xbib.metrics.common.Meter;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
* @param request request
|
||||
* @return his bulk processor
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
public DefaultBulkProcessor add(ActionRequest request) {
|
||||
return add(request, null);
|
||||
|
@ -144,6 +145,7 @@ public class DefaultBulkProcessor implements BulkProcessor {
|
|||
* @param payload payload
|
||||
* @return his bulk processor
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Override
|
||||
public DefaultBulkProcessor add(ActionRequest request, Object payload) {
|
||||
internalAdd(request, payload);
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
package org.xbib.elx.node.test;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.junit.Test;
|
||||
import org.xbib.elx.api.IndexPruneResult;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
import org.xbib.elx.node.ExtendedNodeClient;
|
||||
import org.xbib.elx.node.ExtendedNodeClientProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class IndexPruneTest extends TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getName());
|
||||
|
||||
@Test
|
||||
public void testPrune() throws IOException {
|
||||
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
|
||||
.provider(ExtendedNodeClientProvider.class)
|
||||
.build();
|
||||
try {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
client.newIndex("test1", settings);
|
||||
client.shiftIndex("test", "test1", Collections.emptyList());
|
||||
client.newIndex("test2", settings);
|
||||
client.shiftIndex("test", "test2", Collections.emptyList());
|
||||
client.newIndex("test3", settings);
|
||||
client.shiftIndex("test", "test3", Collections.emptyList());
|
||||
client.newIndex("test4", settings);
|
||||
client.shiftIndex("test", "test4", Collections.emptyList());
|
||||
|
||||
IndexPruneResult indexPruneResult =
|
||||
client.pruneIndex("test", "test4", 2, 2, true);
|
||||
|
||||
assertTrue(indexPruneResult.getDeletedIndices().contains("test1"));
|
||||
assertTrue(indexPruneResult.getDeletedIndices().contains("test2"));
|
||||
assertFalse(indexPruneResult.getDeletedIndices().contains("test3"));
|
||||
assertFalse(indexPruneResult.getDeletedIndices().contains("test4"));
|
||||
|
||||
List<Boolean> list = new ArrayList<>();
|
||||
for (String index : Arrays.asList("test1", "test2", "test3", "test4")) {
|
||||
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
|
||||
indicesExistsRequest.indices(new String[] { index });
|
||||
IndicesExistsResponse indicesExistsResponse =
|
||||
client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
|
||||
list.add(indicesExistsResponse.isExists());
|
||||
}
|
||||
logger.info(list);
|
||||
assertFalse(list.get(0));
|
||||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -100,7 +100,6 @@ public class IndexShiftTest extends TestBase {
|
|||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.waitForResponses(30L, TimeUnit.SECONDS);
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
</Console>
|
||||
</appenders>
|
||||
<Loggers>
|
||||
<Root level="debug">
|
||||
<Root level="info">
|
||||
<AppenderRef ref="Console" />
|
||||
</Root>
|
||||
</Loggers>
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
package org.xbib.elx.transport;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.junit.Test;
|
||||
import org.xbib.elx.api.IndexPruneResult;
|
||||
import org.xbib.elx.common.ClientBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class IndexPruneTest extends TestBase {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(IndexShiftTest.class.getName());
|
||||
|
||||
@Test
|
||||
public void testPrune() throws IOException {
|
||||
final ExtendedTransportClient client = ClientBuilder.builder()
|
||||
.provider(ExtendedTransportClientProvider.class)
|
||||
.put(getTransportSettings())
|
||||
.build();
|
||||
try {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
client.newIndex("test1", settings);
|
||||
client.shiftIndex("test", "test1", Collections.emptyList());
|
||||
client.newIndex("test2", settings);
|
||||
client.shiftIndex("test", "test2", Collections.emptyList());
|
||||
client.newIndex("test3", settings);
|
||||
client.shiftIndex("test", "test3", Collections.emptyList());
|
||||
client.newIndex("test4", settings);
|
||||
client.shiftIndex("test", "test4", Collections.emptyList());
|
||||
|
||||
IndexPruneResult indexPruneResult =
|
||||
client.pruneIndex("test", "test4", 2, 2, true);
|
||||
|
||||
assertTrue(indexPruneResult.getDeletedIndices().contains("test1"));
|
||||
assertTrue(indexPruneResult.getDeletedIndices().contains("test2"));
|
||||
assertFalse(indexPruneResult.getDeletedIndices().contains("test3"));
|
||||
assertFalse(indexPruneResult.getDeletedIndices().contains("test4"));
|
||||
|
||||
List<Boolean> list = new ArrayList<>();
|
||||
for (String index : Arrays.asList("test1", "test2", "test3", "test4")) {
|
||||
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
|
||||
indicesExistsRequest.indices(new String[] { index });
|
||||
IndicesExistsResponse indicesExistsResponse =
|
||||
client.getClient().execute(IndicesExistsAction.INSTANCE, indicesExistsRequest).actionGet();
|
||||
list.add(indicesExistsResponse.isExists());
|
||||
}
|
||||
logger.info(list);
|
||||
assertFalse(list.get(0));
|
||||
assertFalse(list.get(1));
|
||||
assertTrue(list.get(2));
|
||||
assertTrue(list.get(3));
|
||||
} catch (NoNodeAvailableException e) {
|
||||
logger.warn("skipping, no node available");
|
||||
} finally {
|
||||
client.close();
|
||||
if (client.getBulkController().getLastBulkError() != null) {
|
||||
logger.error("error", client.getBulkController().getLastBulkError());
|
||||
}
|
||||
assertNull(client.getBulkController().getLastBulkError());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,7 +6,7 @@
|
|||
</Console>
|
||||
</appenders>
|
||||
<Loggers>
|
||||
<Root level="debug">
|
||||
<Root level="info">
|
||||
<AppenderRef ref="Console" />
|
||||
</Root>
|
||||
</Loggers>
|
||||
|
|
|
@ -2,7 +2,7 @@ group = org.xbib
|
|||
name = elx
|
||||
version = 2.2.1.6
|
||||
|
||||
xbib-metrics.version = 1.1.0
|
||||
xbib-metrics.version = 1.2.0
|
||||
xbib-guice.version = 4.0.4
|
||||
|
||||
elasticsearch.version = 2.2.1
|
||||
|
|
Loading…
Reference in a new issue