Gradle 3.2.1, remove annotation Nullable, rename ClientBuilder to Clients

This commit is contained in:
Jörg Prante 2017-01-03 14:37:23 +01:00
parent 409ced9c7a
commit 17d114feb1
22 changed files with 149 additions and 163 deletions

View file

@ -1,13 +1,8 @@
plugins { plugins {
id "org.sonarqube" version "2.2" id "org.sonarqube" version "2.2"
id "org.ajoberstar.github-pages" version "1.6.0-rc.1"
id "org.xbib.gradle.plugin.jbake" version "1.2.1"
} }
group = 'org.xbib'
version = '2.2.1.1'
printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" + printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" +
"Build: group: ${project.group} name: ${project.name} version: ${project.version}\n", "Build: group: ${project.group} name: ${project.name} version: ${project.version}\n",
InetAddress.getLocalHost(), InetAddress.getLocalHost(),
@ -28,7 +23,6 @@ apply plugin: 'findbugs'
apply plugin: 'pmd' apply plugin: 'pmd'
apply plugin: 'checkstyle' apply plugin: 'checkstyle'
apply plugin: "jacoco" apply plugin: "jacoco"
apply plugin: 'org.ajoberstar.github-pages'
apply from: 'gradle/ext.gradle' apply from: 'gradle/ext.gradle'
@ -45,9 +39,6 @@ sourceSets {
} }
} }
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
configurations { configurations {
wagon wagon
integrationTestCompile.extendsFrom testCompile integrationTestCompile.extendsFrom testCompile
@ -56,7 +47,9 @@ configurations {
dependencies { dependencies {
compile "org.xbib:metrics:1.0.0" compile "org.xbib:metrics:1.0.0"
compile "org.elasticsearch:elasticsearch:2.2.1" compile("org.elasticsearch:elasticsearch:2.2.1") {
exclude module: "securesm"
}
testCompile "net.java.dev.jna:jna:4.1.0" testCompile "net.java.dev.jna:jna:4.1.0"
testCompile "junit:junit:4.12" testCompile "junit:junit:4.12"
testCompile "org.apache.logging.log4j:log4j-core:2.7" testCompile "org.apache.logging.log4j:log4j-core:2.7"
@ -64,9 +57,12 @@ dependencies {
wagon 'org.apache.maven.wagon:wagon-ssh-external:2.10' wagon 'org.apache.maven.wagon:wagon-ssh-external:2.10'
} }
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8' [compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
tasks.withType(JavaCompile) { tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:all" << "-profile" << "compact3" options.compilerArgs << "-Xlint:all"
} }
task integrationTest(type: Test) { task integrationTest(type: Test) {

3
gradle.properties Normal file
View file

@ -0,0 +1,3 @@
group = org.xbib
name = elasticsearch-extras-client
version = 2.2.1.2

Binary file not shown.

View file

@ -1,6 +1,6 @@
#Tue Nov 01 14:46:00 CET 2016 #Tue Jan 03 14:13:22 CET 2017
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-3.2.1-all.zip

19
gradlew vendored
View file

@ -1,4 +1,4 @@
#!/usr/bin/env bash #!/usr/bin/env sh
############################################################################## ##############################################################################
## ##
@ -154,16 +154,19 @@ if $cygwin ; then
esac esac
fi fi
# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules # Escape application args
function splitJvmOpts() { save ( ) {
JVM_OPTS=("$@") for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
} }
eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS APP_ARGS=$(save "$@")
JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong # by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")" cd "$(dirname "$0")"
fi fi
exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" exec "$JAVACMD" "$@"

View file

@ -19,7 +19,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -51,8 +51,8 @@ public class BulkNodeClientTest extends NodeTestUtils {
@Test @Test
public void testNewIndexNodeClient() throws Exception { public void testNewIndexNodeClient() throws Exception {
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5))
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -66,8 +66,8 @@ public class BulkNodeClientTest extends NodeTestUtils {
@Test @Test
public void testMappingNodeClient() throws Exception { public void testMappingNodeClient() throws Exception {
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5))
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -96,9 +96,9 @@ public class BulkNodeClientTest extends NodeTestUtils {
@Test @Test
public void testSingleDocNodeClient() { public void testSingleDocNodeClient() {
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(30)) .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(30))
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -106,7 +106,7 @@ public class BulkNodeClientTest extends NodeTestUtils {
client.newIndex("test"); client.newIndex("test");
client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
@ -126,9 +126,9 @@ public class BulkNodeClientTest extends NodeTestUtils {
@Test @Test
public void testRandomDocsNodeClient() throws Exception { public void testRandomDocsNodeClient() throws Exception {
long numactions = NUM_ACTIONS; long numactions = NUM_ACTIONS;
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -138,7 +138,7 @@ public class BulkNodeClientTest extends NodeTestUtils {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
@ -157,9 +157,9 @@ public class BulkNodeClientTest extends NodeTestUtils {
Long maxactions = MAX_ACTIONS; Long maxactions = MAX_ACTIONS;
final Long maxloop = NUM_ACTIONS; final Long maxloop = NUM_ACTIONS;
logger.info("NodeClient max={} maxactions={} maxloop={}", maxthreads, maxactions, maxloop); logger.info("NodeClient max={} maxactions={} maxloop={}", maxthreads, maxactions, maxloop);
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions) .put(Clients.MAX_ACTIONS_PER_REQUEST, maxactions)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))// disable auto flush for this test .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))// disable auto flush for this test
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -183,7 +183,7 @@ public class BulkNodeClientTest extends NodeTestUtils {
latch.await(30, TimeUnit.SECONDS); latch.await(30, TimeUnit.SECONDS);
logger.info("flush..."); logger.info("flush...");
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
logger.info("got all responses, thread pool shutdown..."); logger.info("got all responses, thread pool shutdown...");
pool.shutdown(); pool.shutdown();
logger.info("pool is shut down"); logger.info("pool is shut down");

View file

@ -5,10 +5,9 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -29,8 +28,8 @@ public class BulkNodeDuplicateIDTest extends NodeTestUtils {
@Test @Test
public void testDuplicateDocIDs() throws Exception { public void testDuplicateDocIDs() throws Exception {
long numactions = NUM_ACTIONS; long numactions = NUM_ACTIONS;
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -40,7 +39,7 @@ public class BulkNodeDuplicateIDTest extends NodeTestUtils {
client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
client.refreshIndex("test"); client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE)
.setIndices("test") .setIndices("test")

View file

@ -8,7 +8,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.IndexAliasAdder; import org.xbib.elasticsearch.extras.client.IndexAliasAdder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -28,7 +28,7 @@ public class BulkNodeIndexAliasTest extends NodeTestUtils {
@Test @Test
public void testIndexAlias() throws Exception { public void testIndexAlias() throws Exception {
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -66,7 +66,7 @@ public class BulkNodeIndexAliasTest extends NodeTestUtils {
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
client.shutdown(); client.shutdown();
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());

View file

@ -1,17 +1,21 @@
package org.xbib.elasticsearch.extras.client.node; package org.xbib.elasticsearch.extras.client.node;
import org.elasticsearch.action.admin.indices.stats.*; import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.indexing.IndexingStats;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -43,7 +47,7 @@ public class BulkNodeReplicaTest extends NodeTestUtils {
.put("index.number_of_replicas", 1) .put("index.number_of_replicas", 1)
.build(); .build();
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
@ -51,7 +55,7 @@ public class BulkNodeReplicaTest extends NodeTestUtils {
try { try {
client.newIndex("test1", settingsTest1, null) client.newIndex("test1", settingsTest1, null)
.newIndex("test2", settingsTest2, null); .newIndex("test2", settingsTest2, null);
client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {
client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
@ -59,7 +63,7 @@ public class BulkNodeReplicaTest extends NodeTestUtils {
client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(60)); client.waitForResponses("30s");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {

View file

@ -4,10 +4,9 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -38,19 +37,19 @@ public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils {
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.build(); .build();
final BulkNodeClient client = ClientBuilder.builder() final BulkNodeClient client = Clients.builder()
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkNodeClient(client("1")); .toBulkNodeClient(client("1"));
try { try {
client.newIndex("replicatest", settings, null); client.newIndex("replicatest", settings, null);
client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) { for (int i = 0; i < 12345; i++) {
client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel); shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel);
assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {

View file

@ -12,7 +12,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -48,9 +48,9 @@ public class BulkTransportClientTest extends NodeTestUtils {
@Test @Test
public void testBulkClient() throws IOException { public void testBulkClient() throws IOException {
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = Clients.builder()
.put(getSettings()) .put(getSettings())
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkTransportClient(); .toBulkTransportClient();
@ -76,10 +76,10 @@ public class BulkTransportClientTest extends NodeTestUtils {
@Test @Test
public void testSingleDocBulkClient() throws IOException { public void testSingleDocBulkClient() throws IOException {
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = Clients.builder()
.put(getSettings()) .put(getSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkTransportClient(); .toBulkTransportClient();
@ -87,7 +87,7 @@ public class BulkTransportClientTest extends NodeTestUtils {
client.newIndex("test"); client.newIndex("test");
client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -107,10 +107,10 @@ public class BulkTransportClientTest extends NodeTestUtils {
@Test @Test
public void testRandomDocsBulkClient() throws IOException { public void testRandomDocsBulkClient() throws IOException {
long numactions = NUM_ACTIONS; long numactions = NUM_ACTIONS;
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = Clients.builder()
.put(getSettings()) .put(getSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkTransportClient(); .toBulkTransportClient();
@ -120,7 +120,7 @@ public class BulkTransportClientTest extends NodeTestUtils {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -148,10 +148,10 @@ public class BulkTransportClientTest extends NodeTestUtils {
.put("index.number_of_replicas", 1) .put("index.number_of_replicas", 1)
.build(); .build();
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = Clients.builder()
.put(getSettings()) .put(getSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions) .put(Clients.MAX_ACTIONS_PER_REQUEST, maxactions)
.put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) // = disable autoflush for this test .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) // = disable autoflush for this test
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkTransportClient(); .toBulkTransportClient();
@ -173,7 +173,7 @@ public class BulkTransportClientTest extends NodeTestUtils {
latch.await(30, TimeUnit.SECONDS); latch.await(30, TimeUnit.SECONDS);
logger.info("client flush ..."); logger.info("client flush ...");
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
logger.info("thread pool to be shut down ..."); logger.info("thread pool to be shut down ...");
pool.shutdown(); pool.shutdown();
logger.info("poot shut down"); logger.info("poot shut down");

View file

@ -5,10 +5,9 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -26,9 +25,9 @@ public class BulkTransportDuplicateIDTest extends NodeTestUtils {
@Test @Test
public void testDuplicateDocIDs() throws Exception { public void testDuplicateDocIDs() throws Exception {
long numactions = NUM_ACTIONS; long numactions = NUM_ACTIONS;
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = Clients.builder()
.put(getSettings()) .put(getSettings())
.put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS)
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
.toBulkTransportClient(); .toBulkTransportClient();
@ -38,7 +37,7 @@ public class BulkTransportDuplicateIDTest extends NodeTestUtils {
client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
client.refreshIndex("test"); client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE)
.setIndices("test") .setIndices("test")

View file

@ -1,17 +1,21 @@
package org.xbib.elasticsearch.extras.client.transport; package org.xbib.elasticsearch.extras.client.transport;
import org.elasticsearch.action.admin.indices.stats.*; import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.indexing.IndexingStats;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -46,7 +50,7 @@ public class BulkTransportReplicaTest extends NodeTestUtils {
.put("index.number_of_replicas", 1) .put("index.number_of_replicas", 1)
.build(); .build();
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = Clients.builder()
.put(getSettings()) .put(getSettings())
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
@ -54,7 +58,7 @@ public class BulkTransportReplicaTest extends NodeTestUtils {
try { try {
client.newIndex("test1", settingsTest1, null) client.newIndex("test1", settingsTest1, null)
.newIndex("test2", settingsTest2, null); .newIndex("test2", settingsTest2, null);
client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {
client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
@ -62,7 +66,7 @@ public class BulkTransportReplicaTest extends NodeTestUtils {
client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(60)); client.waitForResponses("30s");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {

View file

@ -4,10 +4,9 @@ import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.ClientBuilder; import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
@ -39,7 +38,7 @@ public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils {
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.build(); .build();
final BulkTransportClient client = ClientBuilder.builder() final BulkTransportClient client = Clients.builder()
.put(getSettings()) .put(getSettings())
.setMetric(new SimpleBulkMetric()) .setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl()) .setControl(new SimpleBulkControl())
@ -47,12 +46,12 @@ public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils {
try { try {
client.newIndex("replicatest", settings, null); client.newIndex("replicatest", settings, null);
client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) { for (int i = 0; i < 12345; i++) {
client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses(TimeValue.timeValueSeconds(30)); client.waitForResponses("30s");
shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel); shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel);
assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {

View file

@ -194,7 +194,7 @@ public abstract class AbstractClient {
return shards; return shards;
} }
public void waitForCluster(String statusString, TimeValue timeout) throws IOException { public void waitForCluster(String statusString, String timeout) throws IOException {
if (client() == null) { if (client() == null) {
return; return;
} }
@ -255,7 +255,7 @@ public abstract class AbstractClient {
} }
public int updateReplicaLevel(String index, int level) throws IOException { public int updateReplicaLevel(String index, int level) throws IOException {
waitForCluster("YELLOW", TimeValue.timeValueSeconds(30)); waitForCluster("YELLOW", "30s");
updateIndexSetting(index, "number_of_replicas", level); updateIndexSetting(index, "number_of_replicas", level);
return waitForRecovery(index); return waitForRecovery(index);
} }

View file

@ -8,8 +8,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -48,8 +46,8 @@ public class BulkProcessor implements Closeable {
private volatile boolean closed = false; private volatile boolean closed = false;
private BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, private BulkProcessor(Client client, Listener listener, String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) { int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
this.bulkActions = bulkActions; this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytes(); this.bulkSize = bulkSize.bytes();
@ -159,35 +157,23 @@ public class BulkProcessor implements Closeable {
* @param payload payload * @param payload payload
* @return his bulk processor * @return his bulk processor
*/ */
public BulkProcessor add(ActionRequest<?> request, @Nullable Object payload) { public BulkProcessor add(ActionRequest<?> request, Object payload) {
internalAdd(request, payload); internalAdd(request, payload);
return this; return this;
} }
protected void ensureOpen() { private void ensureOpen() {
if (closed) { if (closed) {
throw new IllegalStateException("bulk process already closed"); throw new IllegalStateException("bulk process already closed");
} }
} }
private synchronized void internalAdd(ActionRequest<?> request, @Nullable Object payload) { private synchronized void internalAdd(ActionRequest<?> request, Object payload) {
ensureOpen(); ensureOpen();
bulkRequest.add(request, payload); bulkRequest.add(request, payload);
executeIfNeeded(); executeIfNeeded();
} }
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType)
throws Exception {
return add(data, defaultIndex, defaultType, null);
}
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex,
@Nullable String defaultType, @Nullable Object payload) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, payload, true);
executeIfNeeded();
return this;
}
private void executeIfNeeded() { private void executeIfNeeded() {
ensureOpen(); ensureOpen();
if (!isOverTheLimit()) { if (!isOverTheLimit()) {

View file

@ -5,8 +5,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -102,7 +100,7 @@ public interface ClientMethods extends Parameters {
* @param maxVolume maximum volume * @param maxVolume maximum volume
* @return this ingest * @return this ingest
*/ */
ClientMethods maxVolumePerRequest(ByteSizeValue maxVolume); ClientMethods maxVolumePerRequest(String maxVolume);
/** /**
* Set the flush interval for automatic flushing outstanding ingest requests. * Set the flush interval for automatic flushing outstanding ingest requests.
@ -110,7 +108,7 @@ public interface ClientMethods extends Parameters {
* @param flushInterval the flush interval, default is 30 seconds * @param flushInterval the flush interval, default is 30 seconds
* @return this ingest * @return this ingest
*/ */
ClientMethods flushIngestInterval(TimeValue flushInterval); ClientMethods flushIngestInterval(String flushInterval);
/** /**
* Set mapping. * Set mapping.
@ -243,12 +241,12 @@ public interface ClientMethods extends Parameters {
/** /**
* Wait for all outstanding responses. * Wait for all outstanding responses.
* *
* @param maxWait maximum wait time * @param maxWaitTime maximum wait time
* @return this ingest * @return this ingest
* @throws InterruptedException if wait is interrupted * @throws InterruptedException if wait is interrupted
* @throws ExecutionException if execution failed * @throws ExecutionException if execution failed
*/ */
ClientMethods waitForResponses(TimeValue maxWait) throws InterruptedException, ExecutionException; ClientMethods waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException;
/** /**
* Refresh the index. * Refresh the index.
@ -281,7 +279,7 @@ public interface ClientMethods extends Parameters {
* @param timeValue time value * @param timeValue time value
* @throws IOException if wait failed * @throws IOException if wait failed
*/ */
void waitForCluster(String healthColor, TimeValue timeValue) throws IOException; void waitForCluster(String healthColor, String timeValue) throws IOException;
/** /**
* Get current health color. * Get current health color.

View file

@ -11,7 +11,7 @@ import org.xbib.elasticsearch.extras.client.transport.MockTransportClient;
/** /**
* *
*/ */
public final class ClientBuilder implements Parameters { public final class Clients implements Parameters {
private final Settings.Builder settingsBuilder; private final Settings.Builder settingsBuilder;
@ -19,55 +19,55 @@ public final class ClientBuilder implements Parameters {
private BulkControl control; private BulkControl control;
public ClientBuilder() { public Clients() {
settingsBuilder = Settings.builder(); settingsBuilder = Settings.builder();
} }
public static ClientBuilder builder() { public static Clients builder() {
return new ClientBuilder(); return new Clients();
} }
public ClientBuilder put(String key, String value) { public Clients put(String key, String value) {
settingsBuilder.put(key, value); settingsBuilder.put(key, value);
return this; return this;
} }
public ClientBuilder put(String key, Integer value) { public Clients put(String key, Integer value) {
settingsBuilder.put(key, value); settingsBuilder.put(key, value);
return this; return this;
} }
public ClientBuilder put(String key, Long value) { public Clients put(String key, Long value) {
settingsBuilder.put(key, value); settingsBuilder.put(key, value);
return this; return this;
} }
public ClientBuilder put(String key, Double value) { public Clients put(String key, Double value) {
settingsBuilder.put(key, value); settingsBuilder.put(key, value);
return this; return this;
} }
public ClientBuilder put(String key, ByteSizeValue value) { public Clients put(String key, ByteSizeValue value) {
settingsBuilder.put(key, value); settingsBuilder.put(key, value);
return this; return this;
} }
public ClientBuilder put(String key, TimeValue value) { public Clients put(String key, TimeValue value) {
settingsBuilder.put(key, value); settingsBuilder.put(key, value);
return this; return this;
} }
public ClientBuilder put(Settings settings) { public Clients put(Settings settings) {
settingsBuilder.put(settings); settingsBuilder.put(settings);
return this; return this;
} }
public ClientBuilder setMetric(BulkMetric metric) { public Clients setMetric(BulkMetric metric) {
this.metric = metric; this.metric = metric;
return this; return this;
} }
public ClientBuilder setControl(BulkControl control) { public Clients setControl(BulkControl control) {
this.control = control; this.control = control;
return this; return this;
} }
@ -77,8 +77,8 @@ public final class ClientBuilder implements Parameters {
return new BulkNodeClient() return new BulkNodeClient()
.maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST)) .maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST))
.maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS)) .maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS))
.maxVolumePerRequest(settings.getAsBytesSize(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) .maxVolumePerRequest(settings.get(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST))
.flushIngestInterval(settings.getAsTime(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) .flushIngestInterval(settings.get(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL))
.init(client, metric, control); .init(client, metric, control);
} }
@ -87,8 +87,8 @@ public final class ClientBuilder implements Parameters {
return new BulkTransportClient() return new BulkTransportClient()
.maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST)) .maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST))
.maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS)) .maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS))
.maxVolumePerRequest(settings.getAsBytesSize(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) .maxVolumePerRequest(settings.get(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST))
.flushIngestInterval(settings.getAsTime(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) .flushIngestInterval(settings.get(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL))
.init(settings, metric, control); .init(settings, metric, control);
} }
@ -97,8 +97,8 @@ public final class ClientBuilder implements Parameters {
return new MockTransportClient() return new MockTransportClient()
.maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST)) .maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST))
.maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS)) .maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS))
.maxVolumePerRequest(settings.getAsBytesSize(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) .maxVolumePerRequest(settings.get(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST))
.flushIngestInterval(settings.getAsTime(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) .flushIngestInterval(settings.get(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL))
.init(settings, metric, control); .init(settings, metric, control);
} }

View file

@ -1,9 +1,5 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elasticsearch.extras.client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
/** /**
* *
*/ */
@ -13,9 +9,9 @@ public interface Parameters {
int DEFAULT_MAX_CONCURRENT_REQUESTS = Runtime.getRuntime().availableProcessors() * 4; int DEFAULT_MAX_CONCURRENT_REQUESTS = Runtime.getRuntime().availableProcessors() * 4;
ByteSizeValue DEFAULT_MAX_VOLUME_PER_REQUEST = new ByteSizeValue(10, ByteSizeUnit.MB); String DEFAULT_MAX_VOLUME_PER_REQUEST = "10mb";
TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(30); String DEFAULT_FLUSH_INTERVAL = "30s";
String MAX_ACTIONS_PER_REQUEST = "max_actions_per_request"; String MAX_ACTIONS_PER_REQUEST = "max_actions_per_request";

View file

@ -50,9 +50,9 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
private ByteSizeValue maxVolume = DEFAULT_MAX_VOLUME_PER_REQUEST; private ByteSizeValue maxVolume;
private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL; private TimeValue flushInterval;
private Node node; private Node node;
@ -81,14 +81,14 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
} }
@Override @Override
public BulkNodeClient maxVolumePerRequest(ByteSizeValue maxVolume) { public BulkNodeClient maxVolumePerRequest(String maxVolume) {
this.maxVolume = maxVolume; this.maxVolume = ByteSizeValue.parseBytesSizeValue(maxVolume, "maxVolumePerRequest");
return this; return this;
} }
@Override @Override
public BulkNodeClient flushIngestInterval(TimeValue flushInterval) { public BulkNodeClient flushIngestInterval(String flushInterval) {
this.flushInterval = flushInterval; this.flushInterval = TimeValue.parseTimeValue(flushInterval, TimeValue.timeValueSeconds(5), "flushIngestInterval");
return this; return this;
} }
@ -345,11 +345,12 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods {
} }
@Override @Override
public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException { public BulkNodeClient waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException {
if (closed) { if (closed) {
throwClose(); throwClose();
} }
while (!bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS)) { while (!bulkProcessor.awaitClose(TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(30),
"maxWaitTime").getMillis(), TimeUnit.MILLISECONDS)) {
logger.warn("still waiting for responses"); logger.warn("still waiting for responses");
} }
return this; return this;

View file

@ -55,9 +55,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
private ByteSizeValue maxVolumePerRequest = DEFAULT_MAX_VOLUME_PER_REQUEST; private ByteSizeValue maxVolumePerRequest;
private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL; private TimeValue flushInterval;
private BulkProcessor bulkProcessor; private BulkProcessor bulkProcessor;
@ -229,14 +229,14 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
} }
@Override @Override
public BulkTransportClient maxVolumePerRequest(ByteSizeValue maxVolumePerRequest) { public BulkTransportClient maxVolumePerRequest(String maxVolumePerRequest) {
this.maxVolumePerRequest = maxVolumePerRequest; this.maxVolumePerRequest = ByteSizeValue.parseBytesSizeValue(maxVolumePerRequest, "maxVolumePerRequest");
return this; return this;
} }
@Override @Override
public BulkTransportClient flushIngestInterval(TimeValue flushInterval) { public BulkTransportClient flushIngestInterval(String flushInterval) {
this.flushInterval = flushInterval; this.flushInterval = TimeValue.parseTimeValue(flushInterval, TimeValue.timeValueSeconds(5), "flushIngestInterval");
return this; return this;
} }
@ -439,12 +439,13 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods
} }
@Override @Override
public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) public synchronized BulkTransportClient waitForResponses(String maxWaitTime)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
if (closed) { if (closed) {
throwClose(); throwClose();
} }
bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); bulkProcessor.awaitClose(TimeValue.parseTimeValue(maxWaitTime,
TimeValue.timeValueSeconds(30), "maxWaitTime").getMillis(), TimeUnit.MILLISECONDS);
return this; return this;
} }

View file

@ -5,8 +5,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elasticsearch.extras.client.BulkControl; import org.xbib.elasticsearch.extras.client.BulkControl;
import org.xbib.elasticsearch.extras.client.BulkMetric; import org.xbib.elasticsearch.extras.client.BulkMetric;
@ -45,12 +43,12 @@ public class MockTransportClient extends BulkTransportClient {
} }
@Override @Override
public MockTransportClient maxVolumePerRequest(ByteSizeValue maxVolumePerRequest) { public MockTransportClient maxVolumePerRequest(String maxVolumePerRequest) {
return this; return this;
} }
@Override @Override
public MockTransportClient flushIngestInterval(TimeValue interval) { public MockTransportClient flushIngestInterval(String interval) {
return this; return this;
} }
@ -90,7 +88,7 @@ public class MockTransportClient extends BulkTransportClient {
} }
@Override @Override
public MockTransportClient waitForResponses(TimeValue timeValue) throws InterruptedException { public MockTransportClient waitForResponses(String timeValue) throws InterruptedException {
return this; return this;
} }
@ -135,7 +133,7 @@ public class MockTransportClient extends BulkTransportClient {
} }
@Override @Override
public void waitForCluster(String healthColor, TimeValue timeValue) throws IOException { public void waitForCluster(String healthColor, String timeValue) throws IOException {
// mockup method // mockup method
} }