diff --git a/build.gradle b/build.gradle index bc9dc21..2f8478a 100644 --- a/build.gradle +++ b/build.gradle @@ -1,13 +1,8 @@ plugins { id "org.sonarqube" version "2.2" - id "org.ajoberstar.github-pages" version "1.6.0-rc.1" - id "org.xbib.gradle.plugin.jbake" version "1.2.1" } -group = 'org.xbib' -version = '2.2.1.1' - printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" + "Build: group: ${project.group} name: ${project.name} version: ${project.version}\n", InetAddress.getLocalHost(), @@ -28,7 +23,6 @@ apply plugin: 'findbugs' apply plugin: 'pmd' apply plugin: 'checkstyle' apply plugin: "jacoco" -apply plugin: 'org.ajoberstar.github-pages' apply from: 'gradle/ext.gradle' @@ -45,9 +39,6 @@ sourceSets { } } -sourceCompatibility = JavaVersion.VERSION_1_8 -targetCompatibility = JavaVersion.VERSION_1_8 - configurations { wagon integrationTestCompile.extendsFrom testCompile @@ -56,7 +47,9 @@ configurations { dependencies { compile "org.xbib:metrics:1.0.0" - compile "org.elasticsearch:elasticsearch:2.2.1" + compile("org.elasticsearch:elasticsearch:2.2.1") { + exclude module: "securesm" + } testCompile "net.java.dev.jna:jna:4.1.0" testCompile "junit:junit:4.12" testCompile "org.apache.logging.log4j:log4j-core:2.7" @@ -64,9 +57,12 @@ dependencies { wagon 'org.apache.maven.wagon:wagon-ssh-external:2.10' } +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' tasks.withType(JavaCompile) { - options.compilerArgs << "-Xlint:all" << "-profile" << "compact3" + options.compilerArgs << "-Xlint:all" } task integrationTest(type: Test) { diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..3f32f32 --- /dev/null +++ b/gradle.properties @@ -0,0 +1,3 @@ +group = org.xbib +name = elasticsearch-extras-client +version = 2.2.1.2 diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 6ffa237..51288f9 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 2a06e59..27b5466 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Tue Nov 01 14:46:00 CET 2016 +#Tue Jan 03 14:13:22 CET 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.2.1-all.zip diff --git a/gradlew b/gradlew index 9aa616c..4453cce 100755 --- a/gradlew +++ b/gradlew @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/usr/bin/env sh ############################################################################## ## @@ -154,16 +154,19 @@ if $cygwin ; then esac fi -# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules -function splitJvmOpts() { - JVM_OPTS=("$@") +# Escape application args +save ( ) { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " } -eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS -JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" # by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong -if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then cd "$(dirname "$0")" fi -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" +exec "$JAVACMD" "$@" diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java index e01ca67..77b004f 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClientTest.java @@ -19,7 +19,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.junit.Before; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -51,8 +51,8 @@ public class BulkNodeClientTest extends NodeTestUtils { @Test public void testNewIndexNodeClient() throws Exception { - final BulkNodeClient client = ClientBuilder.builder() - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) + final BulkNodeClient client = Clients.builder() + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -66,8 +66,8 @@ public class BulkNodeClientTest extends NodeTestUtils { @Test public void testMappingNodeClient() throws Exception { - final BulkNodeClient client = ClientBuilder.builder() - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) + final BulkNodeClient client = Clients.builder() + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -96,9 +96,9 @@ public class BulkNodeClientTest extends NodeTestUtils { @Test public void testSingleDocNodeClient() { - final BulkNodeClient client = ClientBuilder.builder() - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(30)) + final BulkNodeClient client = Clients.builder() + .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(30)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -106,7 +106,7 @@ public class BulkNodeClientTest extends NodeTestUtils { client.newIndex("test"); client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); } catch (InterruptedException e) { // ignore } catch (NoNodeAvailableException e) { @@ -126,9 +126,9 @@ public class BulkNodeClientTest extends NodeTestUtils { @Test public void testRandomDocsNodeClient() throws Exception { long numactions = NUM_ACTIONS; - final BulkNodeClient client = ClientBuilder.builder() - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) + final BulkNodeClient client = Clients.builder() + .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -138,7 +138,7 @@ public class BulkNodeClientTest extends NodeTestUtils { client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { @@ -157,9 +157,9 @@ public class BulkNodeClientTest extends NodeTestUtils { Long maxactions = MAX_ACTIONS; final Long maxloop = NUM_ACTIONS; logger.info("NodeClient max={} maxactions={} maxloop={}", maxthreads, maxactions, maxloop); - final BulkNodeClient client = ClientBuilder.builder() - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions) - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))// disable auto flush for this test + final BulkNodeClient client = Clients.builder() + .put(Clients.MAX_ACTIONS_PER_REQUEST, maxactions) + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))// disable auto flush for this test .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -183,7 +183,7 @@ public class BulkNodeClientTest extends NodeTestUtils { latch.await(30, TimeUnit.SECONDS); logger.info("flush..."); client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); logger.info("got all responses, thread pool shutdown..."); pool.shutdown(); logger.info("pool is shut down"); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java index 7d8ba1f..7c11526 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeDuplicateIDTest.java @@ -5,10 +5,9 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -29,8 +28,8 @@ public class BulkNodeDuplicateIDTest extends NodeTestUtils { @Test public void testDuplicateDocIDs() throws Exception { long numactions = NUM_ACTIONS; - final BulkNodeClient client = ClientBuilder.builder() - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) + final BulkNodeClient client = Clients.builder() + .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -40,7 +39,7 @@ public class BulkNodeDuplicateIDTest extends NodeTestUtils { client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) .setIndices("test") diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java index d4b19b0..eb5256c 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeIndexAliasTest.java @@ -8,7 +8,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.IndexAliasAdder; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -28,7 +28,7 @@ public class BulkNodeIndexAliasTest extends NodeTestUtils { @Test public void testIndexAlias() throws Exception { - final BulkNodeClient client = ClientBuilder.builder() + final BulkNodeClient client = Clients.builder() .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -66,7 +66,7 @@ public class BulkNodeIndexAliasTest extends NodeTestUtils { } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); client.shutdown(); if (client.hasThrowable()) { logger.error("error", client.getThrowable()); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java index 93141e1..b4fec6b 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeReplicaTest.java @@ -1,17 +1,21 @@ package org.xbib.elasticsearch.extras.client.node; -import org.elasticsearch.action.admin.indices.stats.*; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.indexing.IndexingStats; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -43,7 +47,7 @@ public class BulkNodeReplicaTest extends NodeTestUtils { .put("index.number_of_replicas", 1) .build(); - final BulkNodeClient client = ClientBuilder.builder() + final BulkNodeClient client = Clients.builder() .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); @@ -51,7 +55,7 @@ public class BulkNodeReplicaTest extends NodeTestUtils { try { client.newIndex("test1", settingsTest1, null) .newIndex("test2", settingsTest2, null); - client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); + client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 1234; i++) { client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } @@ -59,7 +63,7 @@ public class BulkNodeReplicaTest extends NodeTestUtils { client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(60)); + client.waitForResponses("30s"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java index b1c88fe..5dc9202 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/node/BulkNodeUpdateReplicaLevelTest.java @@ -4,10 +4,9 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -38,19 +37,19 @@ public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils { .put("index.number_of_replicas", 0) .build(); - final BulkNodeClient client = ClientBuilder.builder() + final BulkNodeClient client = Clients.builder() .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkNodeClient(client("1")); try { client.newIndex("replicatest", settings, null); - client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); + client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 12345; i++) { client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel); assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); } catch (NoNodeAvailableException e) { diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java index 0a35742..c7c82e0 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClientTest.java @@ -12,7 +12,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.junit.Before; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -48,9 +48,9 @@ public class BulkTransportClientTest extends NodeTestUtils { @Test public void testBulkClient() throws IOException { - final BulkTransportClient client = ClientBuilder.builder() + final BulkTransportClient client = Clients.builder() .put(getSettings()) - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); @@ -76,10 +76,10 @@ public class BulkTransportClientTest extends NodeTestUtils { @Test public void testSingleDocBulkClient() throws IOException { - final BulkTransportClient client = ClientBuilder.builder() + final BulkTransportClient client = Clients.builder() .put(getSettings()) - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) + .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); @@ -87,7 +87,7 @@ public class BulkTransportClientTest extends NodeTestUtils { client.newIndex("test"); client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); } catch (InterruptedException e) { // ignore } catch (ExecutionException e) { @@ -107,10 +107,10 @@ public class BulkTransportClientTest extends NodeTestUtils { @Test public void testRandomDocsBulkClient() throws IOException { long numactions = NUM_ACTIONS; - final BulkTransportClient client = ClientBuilder.builder() + final BulkTransportClient client = Clients.builder() .put(getSettings()) - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) + .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); @@ -120,7 +120,7 @@ public class BulkTransportClientTest extends NodeTestUtils { client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); } catch (InterruptedException e) { // ignore } catch (ExecutionException e) { @@ -148,10 +148,10 @@ public class BulkTransportClientTest extends NodeTestUtils { .put("index.number_of_replicas", 1) .build(); - final BulkTransportClient client = ClientBuilder.builder() + final BulkTransportClient client = Clients.builder() .put(getSettings()) - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, maxactions) - .put(ClientBuilder.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) // = disable autoflush for this test + .put(Clients.MAX_ACTIONS_PER_REQUEST, maxactions) + .put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) // = disable autoflush for this test .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); @@ -173,7 +173,7 @@ public class BulkTransportClientTest extends NodeTestUtils { latch.await(30, TimeUnit.SECONDS); logger.info("client flush ..."); client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); logger.info("thread pool to be shut down ..."); pool.shutdown(); logger.info("poot shut down"); diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java index 00a4066..c087601 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportDuplicateIDTest.java @@ -5,10 +5,9 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -26,9 +25,9 @@ public class BulkTransportDuplicateIDTest extends NodeTestUtils { @Test public void testDuplicateDocIDs() throws Exception { long numactions = NUM_ACTIONS; - final BulkTransportClient client = ClientBuilder.builder() + final BulkTransportClient client = Clients.builder() .put(getSettings()) - .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) + .put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) .toBulkTransportClient(); @@ -38,7 +37,7 @@ public class BulkTransportDuplicateIDTest extends NodeTestUtils { client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); client.refreshIndex("test"); SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) .setIndices("test") diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java index 119688e..bc8f449 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportReplicaTest.java @@ -1,17 +1,21 @@ package org.xbib.elasticsearch.extras.client.transport; -import org.elasticsearch.action.admin.indices.stats.*; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.indexing.IndexingStats; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -46,7 +50,7 @@ public class BulkTransportReplicaTest extends NodeTestUtils { .put("index.number_of_replicas", 1) .build(); - final BulkTransportClient client = ClientBuilder.builder() + final BulkTransportClient client = Clients.builder() .put(getSettings()) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) @@ -54,7 +58,7 @@ public class BulkTransportReplicaTest extends NodeTestUtils { try { client.newIndex("test1", settingsTest1, null) .newIndex("test2", settingsTest2, null); - client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); + client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 1234; i++) { client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } @@ -62,7 +66,7 @@ public class BulkTransportReplicaTest extends NodeTestUtils { client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(60)); + client.waitForResponses("30s"); } catch (NoNodeAvailableException e) { logger.warn("skipping, no node available"); } finally { diff --git a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java index 8ed2c4a..1f56df8 100644 --- a/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java +++ b/src/integration-test/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportUpdateReplicaLevelTest.java @@ -4,10 +4,9 @@ import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.junit.Test; import org.xbib.elasticsearch.NodeTestUtils; -import org.xbib.elasticsearch.extras.client.ClientBuilder; +import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elasticsearch.extras.client.SimpleBulkControl; import org.xbib.elasticsearch.extras.client.SimpleBulkMetric; @@ -39,7 +38,7 @@ public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils { .put("index.number_of_replicas", 0) .build(); - final BulkTransportClient client = ClientBuilder.builder() + final BulkTransportClient client = Clients.builder() .put(getSettings()) .setMetric(new SimpleBulkMetric()) .setControl(new SimpleBulkControl()) @@ -47,12 +46,12 @@ public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils { try { client.newIndex("replicatest", settings, null); - client.waitForCluster("GREEN", TimeValue.timeValueSeconds(30)); + client.waitForCluster("GREEN", "30s"); for (int i = 0; i < 12345; i++) { client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}"); } client.flushIngest(); - client.waitForResponses(TimeValue.timeValueSeconds(30)); + client.waitForResponses("30s"); shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel); assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1)); } catch (NoNodeAvailableException e) { diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java index 877067b..aed7be0 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/AbstractClient.java @@ -194,7 +194,7 @@ public abstract class AbstractClient { return shards; } - public void waitForCluster(String statusString, TimeValue timeout) throws IOException { + public void waitForCluster(String statusString, String timeout) throws IOException { if (client() == null) { return; } @@ -255,7 +255,7 @@ public abstract class AbstractClient { } public int updateReplicaLevel(String index, int level) throws IOException { - waitForCluster("YELLOW", TimeValue.timeValueSeconds(30)); + waitForCluster("YELLOW", "30s"); updateIndexSetting(index, "number_of_replicas", level); return waitForRecovery(index); } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java index 814a7c7..b32637e 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/BulkProcessor.java @@ -8,8 +8,6 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -48,8 +46,8 @@ public class BulkProcessor implements Closeable { private volatile boolean closed = false; - private BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, - int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) { + private BulkProcessor(Client client, Listener listener, String name, int concurrentRequests, + int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.bytes(); @@ -159,35 +157,23 @@ public class BulkProcessor implements Closeable { * @param payload payload * @return his bulk processor */ - public BulkProcessor add(ActionRequest request, @Nullable Object payload) { + public BulkProcessor add(ActionRequest request, Object payload) { internalAdd(request, payload); return this; } - protected void ensureOpen() { + private void ensureOpen() { if (closed) { throw new IllegalStateException("bulk process already closed"); } } - private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) { + private synchronized void internalAdd(ActionRequest request, Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); } - public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) - throws Exception { - return add(data, defaultIndex, defaultType, null); - } - - public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, - @Nullable String defaultType, @Nullable Object payload) throws Exception { - bulkRequest.add(data, defaultIndex, defaultType, null, null, payload, true); - executeIfNeeded(); - return this; - } - private void executeIfNeeded() { ensureOpen(); if (!isOverTheLimit()) { diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java b/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java index 74de495..a683b63 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/ClientMethods.java @@ -5,8 +5,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.io.InputStream; @@ -102,7 +100,7 @@ public interface ClientMethods extends Parameters { * @param maxVolume maximum volume * @return this ingest */ - ClientMethods maxVolumePerRequest(ByteSizeValue maxVolume); + ClientMethods maxVolumePerRequest(String maxVolume); /** * Set the flush interval for automatic flushing outstanding ingest requests. @@ -110,7 +108,7 @@ public interface ClientMethods extends Parameters { * @param flushInterval the flush interval, default is 30 seconds * @return this ingest */ - ClientMethods flushIngestInterval(TimeValue flushInterval); + ClientMethods flushIngestInterval(String flushInterval); /** * Set mapping. @@ -243,12 +241,12 @@ public interface ClientMethods extends Parameters { /** * Wait for all outstanding responses. * - * @param maxWait maximum wait time + * @param maxWaitTime maximum wait time * @return this ingest * @throws InterruptedException if wait is interrupted * @throws ExecutionException if execution failed */ - ClientMethods waitForResponses(TimeValue maxWait) throws InterruptedException, ExecutionException; + ClientMethods waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException; /** * Refresh the index. @@ -281,7 +279,7 @@ public interface ClientMethods extends Parameters { * @param timeValue time value * @throws IOException if wait failed */ - void waitForCluster(String healthColor, TimeValue timeValue) throws IOException; + void waitForCluster(String healthColor, String timeValue) throws IOException; /** * Get current health color. diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/ClientBuilder.java b/src/main/java/org/xbib/elasticsearch/extras/client/Clients.java similarity index 65% rename from src/main/java/org/xbib/elasticsearch/extras/client/ClientBuilder.java rename to src/main/java/org/xbib/elasticsearch/extras/client/Clients.java index 4089249..daa4981 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/ClientBuilder.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/Clients.java @@ -11,7 +11,7 @@ import org.xbib.elasticsearch.extras.client.transport.MockTransportClient; /** * */ -public final class ClientBuilder implements Parameters { +public final class Clients implements Parameters { private final Settings.Builder settingsBuilder; @@ -19,55 +19,55 @@ public final class ClientBuilder implements Parameters { private BulkControl control; - public ClientBuilder() { + public Clients() { settingsBuilder = Settings.builder(); } - public static ClientBuilder builder() { - return new ClientBuilder(); + public static Clients builder() { + return new Clients(); } - public ClientBuilder put(String key, String value) { + public Clients put(String key, String value) { settingsBuilder.put(key, value); return this; } - public ClientBuilder put(String key, Integer value) { + public Clients put(String key, Integer value) { settingsBuilder.put(key, value); return this; } - public ClientBuilder put(String key, Long value) { + public Clients put(String key, Long value) { settingsBuilder.put(key, value); return this; } - public ClientBuilder put(String key, Double value) { + public Clients put(String key, Double value) { settingsBuilder.put(key, value); return this; } - public ClientBuilder put(String key, ByteSizeValue value) { + public Clients put(String key, ByteSizeValue value) { settingsBuilder.put(key, value); return this; } - public ClientBuilder put(String key, TimeValue value) { + public Clients put(String key, TimeValue value) { settingsBuilder.put(key, value); return this; } - public ClientBuilder put(Settings settings) { + public Clients put(Settings settings) { settingsBuilder.put(settings); return this; } - public ClientBuilder setMetric(BulkMetric metric) { + public Clients setMetric(BulkMetric metric) { this.metric = metric; return this; } - public ClientBuilder setControl(BulkControl control) { + public Clients setControl(BulkControl control) { this.control = control; return this; } @@ -77,8 +77,8 @@ public final class ClientBuilder implements Parameters { return new BulkNodeClient() .maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST)) .maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS)) - .maxVolumePerRequest(settings.getAsBytesSize(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) - .flushIngestInterval(settings.getAsTime(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) + .maxVolumePerRequest(settings.get(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) + .flushIngestInterval(settings.get(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) .init(client, metric, control); } @@ -87,8 +87,8 @@ public final class ClientBuilder implements Parameters { return new BulkTransportClient() .maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST)) .maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS)) - .maxVolumePerRequest(settings.getAsBytesSize(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) - .flushIngestInterval(settings.getAsTime(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) + .maxVolumePerRequest(settings.get(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) + .flushIngestInterval(settings.get(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) .init(settings, metric, control); } @@ -97,8 +97,8 @@ public final class ClientBuilder implements Parameters { return new MockTransportClient() .maxActionsPerRequest(settings.getAsInt(MAX_ACTIONS_PER_REQUEST, DEFAULT_MAX_ACTIONS_PER_REQUEST)) .maxConcurrentRequests(settings.getAsInt(MAX_CONCURRENT_REQUESTS, DEFAULT_MAX_CONCURRENT_REQUESTS)) - .maxVolumePerRequest(settings.getAsBytesSize(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) - .flushIngestInterval(settings.getAsTime(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) + .maxVolumePerRequest(settings.get(MAX_VOLUME_PER_REQUEST, DEFAULT_MAX_VOLUME_PER_REQUEST)) + .flushIngestInterval(settings.get(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL)) .init(settings, metric, control); } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/Parameters.java b/src/main/java/org/xbib/elasticsearch/extras/client/Parameters.java index 41cc6d2..d77ce24 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/Parameters.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/Parameters.java @@ -1,9 +1,5 @@ package org.xbib.elasticsearch.extras.client; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; - /** * */ @@ -13,9 +9,9 @@ public interface Parameters { int DEFAULT_MAX_CONCURRENT_REQUESTS = Runtime.getRuntime().availableProcessors() * 4; - ByteSizeValue DEFAULT_MAX_VOLUME_PER_REQUEST = new ByteSizeValue(10, ByteSizeUnit.MB); + String DEFAULT_MAX_VOLUME_PER_REQUEST = "10mb"; - TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(30); + String DEFAULT_FLUSH_INTERVAL = "30s"; String MAX_ACTIONS_PER_REQUEST = "max_actions_per_request"; diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java index 74a8dc4..0f387b6 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/node/BulkNodeClient.java @@ -50,9 +50,9 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; - private ByteSizeValue maxVolume = DEFAULT_MAX_VOLUME_PER_REQUEST; + private ByteSizeValue maxVolume; - private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL; + private TimeValue flushInterval; private Node node; @@ -81,14 +81,14 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { } @Override - public BulkNodeClient maxVolumePerRequest(ByteSizeValue maxVolume) { - this.maxVolume = maxVolume; + public BulkNodeClient maxVolumePerRequest(String maxVolume) { + this.maxVolume = ByteSizeValue.parseBytesSizeValue(maxVolume, "maxVolumePerRequest"); return this; } @Override - public BulkNodeClient flushIngestInterval(TimeValue flushInterval) { - this.flushInterval = flushInterval; + public BulkNodeClient flushIngestInterval(String flushInterval) { + this.flushInterval = TimeValue.parseTimeValue(flushInterval, TimeValue.timeValueSeconds(5), "flushIngestInterval"); return this; } @@ -345,11 +345,12 @@ public class BulkNodeClient extends AbstractClient implements ClientMethods { } @Override - public BulkNodeClient waitForResponses(TimeValue maxWaitTime) throws InterruptedException, ExecutionException { + public BulkNodeClient waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException { if (closed) { throwClose(); } - while (!bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS)) { + while (!bulkProcessor.awaitClose(TimeValue.parseTimeValue(maxWaitTime, TimeValue.timeValueSeconds(30), + "maxWaitTime").getMillis(), TimeUnit.MILLISECONDS)) { logger.warn("still waiting for responses"); } return this; diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java index b03aeef..ac37781 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/BulkTransportClient.java @@ -55,9 +55,9 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; - private ByteSizeValue maxVolumePerRequest = DEFAULT_MAX_VOLUME_PER_REQUEST; + private ByteSizeValue maxVolumePerRequest; - private TimeValue flushInterval = DEFAULT_FLUSH_INTERVAL; + private TimeValue flushInterval; private BulkProcessor bulkProcessor; @@ -229,14 +229,14 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods } @Override - public BulkTransportClient maxVolumePerRequest(ByteSizeValue maxVolumePerRequest) { - this.maxVolumePerRequest = maxVolumePerRequest; + public BulkTransportClient maxVolumePerRequest(String maxVolumePerRequest) { + this.maxVolumePerRequest = ByteSizeValue.parseBytesSizeValue(maxVolumePerRequest, "maxVolumePerRequest"); return this; } @Override - public BulkTransportClient flushIngestInterval(TimeValue flushInterval) { - this.flushInterval = flushInterval; + public BulkTransportClient flushIngestInterval(String flushInterval) { + this.flushInterval = TimeValue.parseTimeValue(flushInterval, TimeValue.timeValueSeconds(5), "flushIngestInterval"); return this; } @@ -439,12 +439,13 @@ public class BulkTransportClient extends AbstractClient implements ClientMethods } @Override - public synchronized BulkTransportClient waitForResponses(TimeValue maxWaitTime) + public synchronized BulkTransportClient waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException { if (closed) { throwClose(); } - bulkProcessor.awaitClose(maxWaitTime.getMillis(), TimeUnit.MILLISECONDS); + bulkProcessor.awaitClose(TimeValue.parseTimeValue(maxWaitTime, + TimeValue.timeValueSeconds(30), "maxWaitTime").getMillis(), TimeUnit.MILLISECONDS); return this; } diff --git a/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java b/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java index 86199d2..ed0fcc7 100644 --- a/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java +++ b/src/main/java/org/xbib/elasticsearch/extras/client/transport/MockTransportClient.java @@ -5,8 +5,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.xbib.elasticsearch.extras.client.BulkControl; import org.xbib.elasticsearch.extras.client.BulkMetric; @@ -45,12 +43,12 @@ public class MockTransportClient extends BulkTransportClient { } @Override - public MockTransportClient maxVolumePerRequest(ByteSizeValue maxVolumePerRequest) { + public MockTransportClient maxVolumePerRequest(String maxVolumePerRequest) { return this; } @Override - public MockTransportClient flushIngestInterval(TimeValue interval) { + public MockTransportClient flushIngestInterval(String interval) { return this; } @@ -90,7 +88,7 @@ public class MockTransportClient extends BulkTransportClient { } @Override - public MockTransportClient waitForResponses(TimeValue timeValue) throws InterruptedException { + public MockTransportClient waitForResponses(String timeValue) throws InterruptedException { return this; } @@ -135,7 +133,7 @@ public class MockTransportClient extends BulkTransportClient { } @Override - public void waitForCluster(String healthColor, TimeValue timeValue) throws IOException { + public void waitForCluster(String healthColor, String timeValue) throws IOException { // mockup method }