large refactoring, new subpackage structure, Gradle 5.2.1, Java 11

This commit is contained in:
Jörg Prante 2019-02-18 17:21:57 +01:00
parent 17d114feb1
commit c0dfb9a617
116 changed files with 14232 additions and 2861 deletions

.gitignore vendored
@ -4,10 +4,10 @@
/.idea /.idea
/target /target
.DS_Store .DS_Store
/.settings /.settings
/.classpath /.classpath
/.project /.project
/.gradle /.gradle
/build build
/plugins *.iml

@ -1,9 +1,11 @@
plugins { plugins {
id "org.sonarqube" version "2.2" id "org.sonarqube" version "2.6.1"
id "" version "0.11.0"
id "com.github.spotbugs" version "1.6.9"
id "org.xbib.gradle.plugin.asciidoctor" version ""
} }
printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" + printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGradle: %s Groovy: %s Java: %s\n" +
"Build: group: ${} name: ${} version: ${project.version}\n", "Build: group: ${} name: ${} version: ${project.version}\n",
InetAddress.getLocalHost(), InetAddress.getLocalHost(),
System.getProperty(""), System.getProperty(""),
@ -13,90 +15,81 @@ printf "Host: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGroovy: %s\nGradle: %s\n" +
System.getProperty("java.vm.version"), System.getProperty("java.vm.version"),
System.getProperty("java.vm.vendor"), System.getProperty("java.vm.vendor"),
System.getProperty(""), System.getProperty(""),
GroovySystem.getVersion(), GroovySystem.getVersion(),
gradle.gradleVersion JavaVersion.current()
if (JavaVersion.current() < JavaVersion.VERSION_11) {
throw new GradleException("This build must be run with java 11 or higher")
subprojects {
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'maven' apply plugin: 'maven'
apply plugin: 'signing' apply plugin: 'signing'
apply plugin: 'findbugs' apply plugin: 'com.github.spotbugs'
apply plugin: 'pmd' apply plugin: 'pmd'
apply plugin: 'checkstyle' apply plugin: 'checkstyle'
apply plugin: "jacoco" apply plugin: 'org.xbib.gradle.plugin.asciidoctor'
apply from: 'gradle/ext.gradle'
sourceSets {
integrationTest {
java {
srcDir file('src/integration-test/java')
compileClasspath += main.output
compileClasspath += test.output
resources {
srcDir file('src/integration-test/resources')
configurations { configurations {
wagon wagon
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
} }
dependencies { dependencies {
compile "org.xbib:metrics:1.0.0" testCompile "junit:junit:${'junit.version')}"
compile("org.elasticsearch:elasticsearch:2.2.1") { testCompile "org.apache.logging.log4j:log4j-core:${'log4j.version')}"
exclude module: "securesm" testCompile "org.apache.logging.log4j:log4j-slf4j-impl:${'log4j.version')}"
} wagon "org.apache.maven.wagon:wagon-ssh:${'wagon.version')}"
testCompile ""
testCompile "junit:junit:4.12"
testCompile "org.apache.logging.log4j:log4j-core:2.7"
testCompile "org.apache.logging.log4j:log4j-slf4j-impl:2.7"
wagon 'org.apache.maven.wagon:wagon-ssh-external:2.10'
} }
sourceCompatibility = JavaVersion.VERSION_1_8 sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
tasks.withType(JavaCompile) { tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:all" options.compilerArgs << "-Xlint:all"
if (!options.compilerArgs.contains("-processor")) {
options.compilerArgs << '-proc:none'
} }
task integrationTest(type: Test) { test {
include '**/MiscTestSuite.class' jvmArgs =[
include '**/BulkNodeTestSuite.class' '--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
include '**/BulkTransportTestSuite.class' '--add-opens=java.base/java.nio=ALL-UNNAMED'
testClassesDir = sourceSets.integrationTest.output.classesDir ]
classpath = configurations.integrationTestCompile systemProperty 'jna.debug_load', 'true'
classpath += configurations.integrationTestRuntime testLogging {
classpath += sourceSets.main.output showStandardStreams = true
classpath += sourceSets.test.output exceptionFormat = 'full'
classpath += sourceSets.integrationTest.output }
outputs.upToDateWhen { false }
systemProperty 'path.home', projectDir.absolutePath
testLogging.showStandardStreams = true
} }
integrationTest.mustRunAfter test
check.dependsOn integrationTest
clean { clean {
delete "plugins" delete "plugins"
delete "logs" delete "logs"
delete "out"
} }
task javadocJar(type: Jar, dependsOn: classes) { /*javadoc {
from javadoc options.docletpath = configurations.asciidoclet.files.asType(List)
into "build/tmp" options.doclet = 'org.asciidoctor.Asciidoclet'
options.overview = "src/docs/asciidoclet/overview.adoc"
options.addStringOption "-base-dir", "${projectDir}"
options.addStringOption "-attribute",
configure(options) {
noTimestamp = true
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier 'javadoc' classifier 'javadoc'
} }
task sourcesJar(type: Jar, dependsOn: classes) { task sourcesJar(type: Jar, dependsOn: classes) {
from sourceSets.main.allSource from sourceSets.main.allSource
into "build/tmp"
classifier 'sources' classifier 'sources'
} }
@ -110,5 +103,55 @@ if (project.hasProperty('signing.keyId')) {
} }
} }
apply from: 'gradle/publish.gradle' apply from: "${rootProject.projectDir}/gradle/publish.gradle"
apply from: 'gradle/sonarqube.gradle'
spotbugs {
effort = "max"
reportLevel = "low"
//includeFilter = file("findbugs-exclude.xml")
tasks.withType(com.github.spotbugs.SpotBugsTask) {
ignoreFailures = true
reports {
xml.enabled = false
html.enabled = true
tasks.withType(Pmd) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
tasks.withType(Checkstyle) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
pmd {
toolVersion = '6.11.0'
ruleSets = ['category/java/bestpractices.xml']
checkstyle {
configFile = rootProject.file('config/checkstyle/checkstyle.xml')
ignoreFailures = true
showViolations = true
sonarqube {
properties {
property "sonar.projectName", "${} ${}"
property "sonar.sourceEncoding", "UTF-8"
property "sonar.tests", "src/test/java"
property "sonar.scm.provider", "git"
property "sonar.junit.reportsPath", "build/test-results/test/"

@ -0,0 +1,13 @@
# BSD-style license; for more info see

@ -0,0 +1,144 @@
<?xml version="1.0"?>
<ruleset name="Documentation"
Rules that are related to code documentation.
<rule name="CommentContent"
message="Invalid words or phrases found"
A rule for the politically correct... we don't want to offend anyone.
//OMG, this is horrible, Bob is an idiot !!!
<rule name="CommentRequired"
message="Comment is required"
Denotes whether comments are required (or unwanted) for specific language elements.
* @author Jon Doe
<rule name="CommentSize"
message="Comment is too large"
Determines whether the dimensions of non-header comments found are within the specified limits.
* too many lines!
<rule name="UncommentedEmptyConstructor"
message="Document empty constructor"
Uncommented Empty Constructor finds instances where a constructor does not
contain statements, but there is no comment. By explicitly commenting empty
constructors it is easier to distinguish between intentional (commented)
and unintentional empty constructors.
<property name="xpath">
[count(BlockStatement) = 0 and ($ignoreExplicitConstructorInvocation = 'true' or not(ExplicitConstructorInvocation)) and @containsComment = 'false']
<property name="ignoreExplicitConstructorInvocation" type="Boolean" description="Ignore explicit constructor invocation when deciding whether constructor is empty or not" value="false"/>
public Foo() {
// This constructor is intentionally empty. Nothing special is needed here.
<rule name="UncommentedEmptyMethodBody"
message="Document empty method body"
Uncommented Empty Method Body finds instances where a method body does not contain
statements, but there is no comment. By explicitly commenting empty method bodies
it is easier to distinguish between intentional (commented) and unintentional
empty methods.
<property name="xpath">
//MethodDeclaration/Block[count(BlockStatement) = 0 and @containsComment = 'false']
public void doSomething() {

@ -0,0 +1,393 @@
<?xml version="1.0"?>
<ruleset name="Multithreading"
Rules that flag issues when dealing with multiple threads of execution.
<rule name="AvoidSynchronizedAtMethodLevel"
message="Use block level rather than method level synchronization"
Method-level synchronization can cause problems when new code is added to the method.
Block-level synchronization helps to ensure that only the code that needs synchronization
gets it.
<property name="xpath">
public class Foo {
// Try to avoid this:
synchronized void foo() {
// Prefer this:
void bar() {
synchronized(this) {
// Try to avoid this for static methods:
static synchronized void fooStatic() {
// Prefer this:
static void barStatic() {
synchronized(Foo.class) {
<rule name="AvoidThreadGroup"
message="Avoid using java.lang.ThreadGroup; it is not thread safe"
Avoid using java.lang.ThreadGroup; although it is intended to be used in a threaded environment
it contains methods that are not thread-safe.
<property name="xpath">
//PrimarySuffix[contains(@Image, 'getThreadGroup')]
public class Bar {
void buz() {
ThreadGroup tg = new ThreadGroup("My threadgroup");
tg = new ThreadGroup(tg, "my thread group");
tg = Thread.currentThread().getThreadGroup();
tg = System.getSecurityManager().getThreadGroup();
<rule name="AvoidUsingVolatile"
message="Use of modifier volatile is not recommended."
Use of the keyword 'volatile' is generally used to fine tune a Java application, and therefore, requires
a good expertise of the Java Memory Model. Moreover, its range of action is somewhat misknown. Therefore,
the volatile keyword should not be used for maintenance purpose and portability.
<property name="xpath">
public class ThrDeux {
private volatile String var1; // not suggested
private String var2; // preferred
<rule name="DoNotUseThreads"
message="To be compliant to J2EE, a webapp should not use any thread."
The J2EE specification explicitly forbids the use of threads.
<property name="xpath">
<value>//ClassOrInterfaceType[@Image = 'Thread' or @Image = 'Runnable']</value>
// This is not allowed
public class UsingThread extends Thread {
// Neither this,
public class OtherThread implements Runnable {
// Nor this ...
public void methode() {
Runnable thread = new Thread();;
<rule name="DontCallThreadRun"
message="Don't call explicitly, use Thread.start()"
Explicitly calling method will execute in the caller's thread of control. Instead, call Thread.start() for the intended behavior.
<property name="xpath">
./Name[ends-with(@Image, '.run') or @Image = 'run']
and substring-before(Name/@Image, '.') =//VariableDeclarator/VariableDeclaratorId/@Image
or (./AllocationExpression/ClassOrInterfaceType[pmd-java:typeIs('java.lang.Thread')]
and ../PrimarySuffix[@Image = 'run'])
Thread t = new Thread();; // use t.start() instead
new Thread().run(); // same violation
<rule name="DoubleCheckedLocking"
message="Double checked locking is not thread safe in Java."
Partially created objects can be returned by the Double Checked Locking pattern when used in Java.
An optimizing JRE may assign a reference to the baz variable before it calls the constructor of the object the
reference points to.
Note: With Java 5, you can make Double checked locking work, if you declare the variable to be `volatile`.
For more details refer to: &lt;>
or &lt;>
public class Foo {
/*volatile */ Object baz = null; // fix for Java5 and later: volatile
Object bar() {
if (baz == null) { // baz may be non-null yet not fully created
synchronized(this) {
if (baz == null) {
baz = new Object();
return baz;
<rule name="NonThreadSafeSingleton"
message="Singleton is not thread safe"
Non-thread safe singletons can result in bad state changes. Eliminate
static singletons if possible by instantiating the object directly. Static
singletons are usually not needed as only a single instance exists anyway.
Other possible fixes are to synchronize the entire method or to use an
[initialize-on-demand holder class](
Refrain from using the double-checked locking pattern. The Java Memory Model doesn't
guarantee it to work unless the variable is declared as `volatile`, adding an uneeded
performance penalty. [Reference](
See Effective Java, item 48.
private static Foo foo = null;
//multiple simultaneous callers may see partially initialized objects
public static Foo getFoo() {
if (foo==null) {
foo = new Foo();
return foo;
<rule name="UnsynchronizedStaticDateFormatter"
message="Static DateFormatter objects should be accessed in a synchronized manner"
SimpleDateFormat instances are not synchronized. Sun recommends using separate format instances
for each thread. If multiple threads must access a static formatter, the formatter must be
synchronized either on method or block level.
This rule has been deprecated in favor of the rule {% rule UnsynchronizedStaticFormatter %}.
public class Foo {
private static final SimpleDateFormat sdf = new SimpleDateFormat();
void bar() {
sdf.format(); // poor, no thread-safety
synchronized void foo() {
sdf.format(); // preferred
<rule name="UnsynchronizedStaticFormatter"
message="Static Formatter objects should be accessed in a synchronized manner"
Instances of `java.text.Format` are generally not synchronized.
Sun recommends using separate format instances for each thread.
If multiple threads must access a static formatter, the formatter must be
synchronized either on method or block level.
public class Foo {
private static final SimpleDateFormat sdf = new SimpleDateFormat();
void bar() {
sdf.format(); // poor, no thread-safety
synchronized void foo() {
sdf.format(); // preferred
<rule name="UseConcurrentHashMap"
message="If you run in Java5 or newer and have concurrent access, you should use the ConcurrentHashMap implementation"
Since Java5 brought a new implementation of the Map designed for multi-threaded access, you can
perform efficient map reads without blocking other threads.
<property name="xpath">
//Type[../VariableDeclarator/VariableInitializer//AllocationExpression/ClassOrInterfaceType[@Image != 'ConcurrentHashMap']]
/ReferenceType/ClassOrInterfaceType[@Image = 'Map']
public class ConcurrentApp {
public void getMyInstance() {
Map map1 = new HashMap(); // fine for single-threaded access
Map map2 = new ConcurrentHashMap(); // preferred for use with multiple threads
// the following case will be ignored by this rule
Map map3 = someModule.methodThatReturnMap(); // might be OK, if the returned map is already thread-safe
<rule name="UseNotifyAllInsteadOfNotify"
message="Call Thread.notifyAll() rather than Thread.notify()"
Thread.notify() awakens a thread monitoring the object. If more than one thread is monitoring, then only
one is chosen. The thread chosen is arbitrary; thus its usually safer to call notifyAll() instead.
<property name="xpath">
[PrimarySuffix/Arguments[@ArgumentCount = '0']]
./Name[@Image='notify' or ends-with(@Image,'.notify')]
or ../PrimarySuffix/@Image='notify'
or (./AllocationExpression and ../PrimarySuffix[@Image='notify'])
void bar() {
// If many threads are monitoring x, only one (and you won't know which) will be notified.
// use instead:

@ -0,0 +1,65 @@
<?xml version="1.0"?>
<ruleset name="Security" xmlns=""
Rules that flag potential security flaws.
<rule name="HardCodedCryptoKey"
message="Do not use hard coded encryption keys"
Do not use hard coded values for cryptographic operations. Please store keys outside of source code.
public class Foo {
void good() {
SecretKeySpec secretKeySpec = new SecretKeySpec(Properties.getKey(), "AES");
void bad() {
SecretKeySpec secretKeySpec = new SecretKeySpec("my secret here".getBytes(), "AES");
<rule name="InsecureCryptoIv"
message="Do not use hard coded initialization vector in crypto operations"
Do not use hard coded initialization vector in cryptographic operations. Please use a randomly generated IV.
public class Foo {
void good() {
SecureRandom random = new SecureRandom();
byte iv[] = new byte[16];
void bad() {
byte[] iv = new byte[] { 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, };
void alsoBad() {
byte[] iv = "secret iv in here".getBytes();

View file

@ -0,0 +1,19 @@
dependencies {
compile "org.xbib:metrics:${'xbib-metrics.version')}"
compile("org.elasticsearch:elasticsearch:${'elasticsearch.version')}") {
// exclude ES jackson yaml, cbor, smile versions
exclude group: 'com.fasterxml.jackson.dataformat'
// dependencies that are not meant for client
exclude module: 'securesm'
// we use log4j2, not log4j
exclude group: 'log4j'
// override log4j2 of Elastic with ours
compile "org.apache.logging.log4j:log4j-core:${'log4j.version')}"
// for Elasticsearch session, ES uses SMILE when encoding source for SearchRequest
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${'jackson-dataformat.version')}"
// CBOR ist default JSON content compression encoding in ES 2.2.1
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${'jackson-dataformat.version')}"
// not used, but maybe in other projects
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${'jackson-dataformat.version')}"

elx-api/build.gradle~ Normal file
@ -0,0 +1,18 @@
dependencies {
compile("org.elasticsearch.client:transport:${'elasticsearch.version')}") {
exclude group: 'org.elasticsearch', module: 'securesm'
exclude group: 'org.elasticsearch.plugin', module: 'transport-netty3-client'
exclude group: 'org.elasticsearch.plugin', module: 'reindex-client'
exclude group: 'org.elasticsearch.plugin', module: 'percolator-client'
exclude group: 'org.elasticsearch.plugin', module: 'lang-mustache-client'
// we try to override the Elasticsearch netty by our netty version which might be more recent
compile "io.netty:netty-buffer:${'netty.version')}"
compile "io.netty:netty-codec-http:${'netty.version')}"
compile "io.netty:netty-handler:${'netty.version')}"
jar {
baseName "${}-api"

View file

@ -1,10 +1,8 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.api;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public interface BulkControl { public interface BulkControl {
void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval); void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval);
@ -18,5 +16,4 @@ public interface BulkControl {
Map<String, Long> getStartBulkRefreshIntervals(); Map<String, Long> getStartBulkRefreshIntervals();
Map<String, Long> getStopBulkRefreshIntervals(); Map<String, Long> getStopBulkRefreshIntervals();
} }

@ -1,11 +1,8 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.api;
import org.xbib.metrics.Count; import org.xbib.metrics.Count;
import org.xbib.metrics.Metered; import org.xbib.metrics.Metered;
public interface BulkMetric { public interface BulkMetric {
Metered getTotalIngest(); Metered getTotalIngest();
@ -27,5 +24,4 @@ public interface BulkMetric {
void stop(); void stop();
long elapsed(); long elapsed();
} }

@ -1,11 +1,11 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.api;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import; import;
import; import;
import java.util.List; import java.util.List;
@ -13,38 +13,53 @@ import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
/** /**
* Interface for providing convenient administrative methods for ingesting data into Elasticsearch. * Interface for providing extended administrative methods for managing and ingesting Elasticsearch.
*/ */
public interface ClientMethods extends Parameters { public interface ExtendedClient {
/** /**
* Initialize new ingest client, wrap an existing Elasticsearch client, and set up metrics. * Set an Elasticsearch client to extend from it. May be null for TransportClient.
* * @param client client
* @param client the Elasticsearch client * @return an ELasticsearch client
* @param metric metric
* @param control control
* @return this ingest
* @throws IOException if client could not get created
*/ */
ClientMethods init(ElasticsearchClient client, BulkMetric metric, BulkControl control) throws IOException; ExtendedClient setClient(ElasticsearchClient client);
* Initialize, create new ingest client, and set up metrics.
* @param settings settings
* @param metric metric
* @param control control
* @return this ingest
* @throws IOException if client could not get created
ClientMethods init(Settings settings, BulkMetric metric, BulkControl control) throws IOException;
/** /**
* Return Elasticsearch client. * Return Elasticsearch client.
* *
* @return Elasticsearch client * @return Elasticsearch client
*/ */
ElasticsearchClient client(); ElasticsearchClient getClient();
ExtendedClient setBulkMetric(BulkMetric bulkMetric);
BulkMetric getBulkMetric();
ExtendedClient setBulkControl(BulkControl bulkControl);
BulkControl getBulkControl();
* Create new Elasticsearch client, wrap an existing Elasticsearch client.
* @param settings settings
* @return this client
* @throws IOException if init fails
ExtendedClient init(Settings settings) throws IOException;
* Bulked index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param index the index
* @param type the type
* @param id the id
* @param create true if document must be created
* @param source the source
* @return this
ExtendedClient index(String index, String type, String id, boolean create, BytesReference source);
/** /**
* Index document. * Index document.
@ -52,10 +67,20 @@ public interface ClientMethods extends Parameters {
* @param index the index * @param index the index
* @param type the type * @param type the type
* @param id the id * @param id the id
* @param create true if document is to be created, false otherwise
* @param source the source * @param source the source
* @return this * @return this client methods
*/ */
ClientMethods index(String index, String type, String id, String source); ExtendedClient index(String index, String type, String id, boolean create, String source);
* Bulked index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param indexRequest the index request to add
* @return this ingest
ExtendedClient indexRequest(IndexRequest indexRequest);
/** /**
* Delete document. * Delete document.
@ -65,7 +90,29 @@ public interface ClientMethods extends Parameters {
* @param id the id * @param id the id
* @return this ingest * @return this ingest
*/ */
ClientMethods delete(String index, String type, String id); ExtendedClient delete(String index, String type, String id);
* Bulked delete request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param deleteRequest the delete request to add
* @return this ingest
ExtendedClient deleteRequest(DeleteRequest deleteRequest);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized.
* @param index the index
* @param type the type
* @param id the id
* @param source the source
* @return this
ExtendedClient update(String index, String type, String id, BytesReference source);
/** /**
* Update document. Use with precaution! Does not work in all cases. * Update document. Use with precaution! Does not work in all cases.
@ -76,7 +123,17 @@ public interface ClientMethods extends Parameters {
* @param source the source * @param source the source
* @return this * @return this
*/ */
ClientMethods update(String index, String type, String id, String source); ExtendedClient update(String index, String type, String id, String source);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized.
* @param updateRequest the update request to add
* @return this ingest
ExtendedClient updateRequest(UpdateRequest updateRequest);
/** /**
* Set the maximum number of actions per request. * Set the maximum number of actions per request.
@ -84,7 +141,7 @@ public interface ClientMethods extends Parameters {
* @param maxActionsPerRequest maximum number of actions per request * @param maxActionsPerRequest maximum number of actions per request
* @return this ingest * @return this ingest
*/ */
ClientMethods maxActionsPerRequest(int maxActionsPerRequest); ExtendedClient maxActionsPerRequest(int maxActionsPerRequest);
/** /**
* Set the maximum concurent requests. * Set the maximum concurent requests.
@ -92,7 +149,7 @@ public interface ClientMethods extends Parameters {
* @param maxConcurentRequests maximum number of concurrent ingest requests * @param maxConcurentRequests maximum number of concurrent ingest requests
* @return this Ingest * @return this Ingest
*/ */
ClientMethods maxConcurrentRequests(int maxConcurentRequests); ExtendedClient maxConcurrentRequests(int maxConcurentRequests);
/** /**
* Set the maximum volume for request before flush. * Set the maximum volume for request before flush.
@ -100,7 +157,7 @@ public interface ClientMethods extends Parameters {
* @param maxVolume maximum volume * @param maxVolume maximum volume
* @return this ingest * @return this ingest
*/ */
ClientMethods maxVolumePerRequest(String maxVolume); ExtendedClient maxVolumePerRequest(String maxVolume);
/** /**
* Set the flush interval for automatic flushing outstanding ingest requests. * Set the flush interval for automatic flushing outstanding ingest requests.
@ -108,7 +165,7 @@ public interface ClientMethods extends Parameters {
* @param flushInterval the flush interval, default is 30 seconds * @param flushInterval the flush interval, default is 30 seconds
* @return this ingest * @return this ingest
*/ */
ClientMethods flushIngestInterval(String flushInterval); ExtendedClient flushIngestInterval(String flushInterval);
/** /**
* Set mapping. * Set mapping.
@ -141,7 +198,7 @@ public interface ClientMethods extends Parameters {
* @param index index * @param index index
* @return this ingest * @return this ingest
*/ */
ClientMethods newIndex(String index); ExtendedClient newIndex(String index);
/** /**
* Create a new index. * Create a new index.
@ -153,7 +210,7 @@ public interface ClientMethods extends Parameters {
* @return this ingest * @return this ingest
* @throws IOException if new index creation fails * @throws IOException if new index creation fails
*/ */
ClientMethods newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException; ExtendedClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException;
/** /**
* Create a new index. * Create a new index.
@ -163,7 +220,7 @@ public interface ClientMethods extends Parameters {
* @param mappings mappings * @param mappings mappings
* @return this ingest * @return this ingest
*/ */
ClientMethods newIndex(String index, Settings settings, Map<String, String> mappings); ExtendedClient newIndex(String index, Settings settings, Map<String, String> mappings);
/** /**
* Create new mapping. * Create new mapping.
@ -173,7 +230,7 @@ public interface ClientMethods extends Parameters {
* @param mapping mapping * @param mapping mapping
* @return this ingest * @return this ingest
*/ */
ClientMethods newMapping(String index, String type, Map<String, Object> mapping); ExtendedClient newMapping(String index, String type, Map<String, Object> mapping);
/** /**
* Delete index. * Delete index.
@ -181,7 +238,7 @@ public interface ClientMethods extends Parameters {
* @param index index * @param index index
* @return this ingest * @return this ingest
*/ */
ClientMethods deleteIndex(String index); ExtendedClient deleteIndex(String index);
/** /**
* Start bulk mode. * Start bulk mode.
@ -192,7 +249,8 @@ public interface ClientMethods extends Parameters {
* @return this ingest * @return this ingest
* @throws IOException if bulk could not be started * @throws IOException if bulk could not be started
*/ */
ClientMethods startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException; ExtendedClient startBulk(String index, long startRefreshIntervalSeconds,
long stopRefreshIntervalSeconds) throws IOException;
/** /**
* Stops bulk mode. * Stops bulk mode.
@ -201,42 +259,14 @@ public interface ClientMethods extends Parameters {
* @return this Ingest * @return this Ingest
* @throws IOException if bulk could not be stopped * @throws IOException if bulk could not be stopped
*/ */
ClientMethods stopBulk(String index) throws IOException; ExtendedClient stopBulk(String index) throws IOException;
* Bulked index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param indexRequest the index request to add
* @return this ingest
ClientMethods bulkIndex(IndexRequest indexRequest);
* Bulked delete request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param deleteRequest the delete request to add
* @return this ingest
ClientMethods bulkDelete(DeleteRequest deleteRequest);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized!
* @param updateRequest the update request to add
* @return this ingest
ClientMethods bulkUpdate(UpdateRequest updateRequest);
/** /**
* Flush ingest, move all pending documents to the cluster. * Flush ingest, move all pending documents to the cluster.
* *
* @return this * @return this
*/ */
ClientMethods flushIngest(); ExtendedClient flushIngest();
/** /**
* Wait for all outstanding responses. * Wait for all outstanding responses.
@ -246,7 +276,7 @@ public interface ClientMethods extends Parameters {
* @throws InterruptedException if wait is interrupted * @throws InterruptedException if wait is interrupted
* @throws ExecutionException if execution failed * @throws ExecutionException if execution failed
*/ */
ClientMethods waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException; ExtendedClient waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException;
/** /**
* Refresh the index. * Refresh the index.
@ -363,11 +393,10 @@ public interface ClientMethods extends Parameters {
Long mostRecentDocument(String index, String timestampfieldname) throws IOException; Long mostRecentDocument(String index, String timestampfieldname) throws IOException;
/** /**
* Get metric. * Get cluster name.
* * @return the cluster name
* @return metric
*/ */
BulkMetric getMetric(); String getClusterName();
/** /**
* Returns true is a throwable exists. * Returns true is a throwable exists.
@ -385,6 +414,7 @@ public interface ClientMethods extends Parameters {
/** /**
* Shutdown the ingesting. * Shutdown the ingesting.
* @throws IOException if shutdown fails
*/ */
void shutdown(); void shutdown() throws IOException;
} }

@ -0,0 +1,7 @@
package org.xbib.elx.api;
public interface ExtendedClientProvider<C extends ExtendedClient> {
C getExtendedClient();

@ -1,10 +1,7 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.api;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
@FunctionalInterface @FunctionalInterface
public interface IndexAliasAdder { public interface IndexAliasAdder {

View file

@ -0,0 +1,4 @@
* The API of the Elasticsearch extensions.
package org.xbib.elx.api;

View file

@ -0,0 +1,9 @@
dependencies {
compile project(':elx-api')
compile "org.xbib:guice:${'xbib-guice.version')}"
// add all dependencies to runtime source set, even that which are excluded by Elasticsearch jar,
// for metaprogramming. We are in Groovyland.
runtime "com.vividsolutions:jts:${'jts.version')}"
runtime "${'mustache.version')}"
runtime "${'jna.version')}"

View file

@ -0,0 +1,65 @@
buildscript {
repositories {
maven {
url ''
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:"
apply plugin: ''
configurations {
dependencies {
compile project(':api')
compile "org.xbib:metrics:${'xbib-metrics.version')}"
compileOnly "org.apache.logging.log4j:log4j-api:${'log4j.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
jar {
baseName "${}-common"
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
test {
enabled = false
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
systemProperty 'path.home', project.buildDir.absolutePath
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
randomizedTest {
enabled = false
esTest {
// test with the jars, not the classes, for security manager
// classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty '', 'true'
esTest.dependsOn jar, testJar

View file

@ -1,4 +1,4 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.common;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
@ -248,11 +248,17 @@ public class BulkProcessor implements Closeable {
public static class Builder { public static class Builder {
private final Client client; private final Client client;
private final Listener listener; private final Listener listener;
private String name; private String name;
private int concurrentRequests = 1; private int concurrentRequests = 1;
private int bulkActions = 1000; private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private TimeValue flushInterval = null; private TimeValue flushInterval = null;
/** /**
@ -281,7 +287,7 @@ public class BulkProcessor implements Closeable {
/** /**
* Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single * Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
* request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed * request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
* while accumulating new bulk requests. Defaults to <tt>1</tt>. * while accumulating new bulk requests. Defaults to {@code 1}.
* *
* @param concurrentRequests maximum number of concurrent requests * @param concurrentRequests maximum number of concurrent requests
* @return this builder * @return this builder
@ -293,9 +299,9 @@ public class BulkProcessor implements Closeable {
/** /**
* Sets when to flush a new bulk request based on the number of actions currently added. Defaults to * Sets when to flush a new bulk request based on the number of actions currently added. Defaults to
* <tt>1000</tt>. Can be set to <tt>-1</tt> to disable it. * {@code 1000}. Can be set to {@code -1} to disable it.
* *
* @param bulkActions mbulk actions * @param bulkActions bulk actions
* @return this builder * @return this builder
*/ */
public Builder setBulkActions(int bulkActions) { public Builder setBulkActions(int bulkActions) {
@ -305,7 +311,7 @@ public class BulkProcessor implements Closeable {
/** /**
* Sets when to flush a new bulk request based on the size of actions currently added. Defaults to * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
* <tt>5mb</tt>. Can be set to <tt>-1</tt> to disable it. * {@code 5mb}. Can be set to {@code -1} to disable it.
* *
* @param bulkSize bulk size * @param bulkSize bulk size
* @return this builder * @return this builder
@ -318,7 +324,7 @@ public class BulkProcessor implements Closeable {
/** /**
* Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set. * Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
* Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)} * Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
* can be set to <tt>-1</tt> with the flush interval set allowing for complete async processing of bulk actions. * can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions.
* *
* @param flushInterval flush interval * @param flushInterval flush interval
* @return this builder * @return this builder
@ -365,8 +371,10 @@ public class BulkProcessor implements Closeable {
} }
private class SyncBulkRequestHandler implements BulkRequestHandler { private static class SyncBulkRequestHandler implements BulkRequestHandler {
private final Client client; private final Client client;
private final BulkProcessor.Listener listener; private final BulkProcessor.Listener listener;
SyncBulkRequestHandler(Client client, BulkProcessor.Listener listener) { SyncBulkRequestHandler(Client client, BulkProcessor.Listener listener) {
@ -390,15 +398,19 @@ public class BulkProcessor implements Closeable {
} }
@Override @Override
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { public boolean awaitClose(long timeout, TimeUnit unit) {
return true; return true;
} }
} }
private class AsyncBulkRequestHandler implements BulkRequestHandler { private static class AsyncBulkRequestHandler implements BulkRequestHandler {
private final Client client; private final Client client;
private final BulkProcessor.Listener listener; private final BulkProcessor.Listener listener;
private final Semaphore semaphore; private final Semaphore semaphore;
private final int concurrentRequests; private final int concurrentRequests;
private AsyncBulkRequestHandler(Client client, BulkProcessor.Listener listener, int concurrentRequests) { private AsyncBulkRequestHandler(Client client, BulkProcessor.Listener listener, int concurrentRequests) {
@ -450,8 +462,8 @@ public class BulkProcessor implements Closeable {
@Override @Override
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) { if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) {
semaphore.release(this.concurrentRequests); semaphore.release(concurrentRequests);
return true; return true;
} }
return false; return false;

@ -0,0 +1,124 @@
package org.xbib.elx.common;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkControl;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.ExtendedClientProvider;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
public class ClientBuilder {
private final ElasticsearchClient client;
private final Settings.Builder settingsBuilder;
private Map<Class<? extends ExtendedClientProvider>, ExtendedClientProvider> providerMap;
private Class<? extends ExtendedClientProvider> provider;
private BulkMetric metric;
private BulkControl control;
public ClientBuilder() {
public ClientBuilder(ElasticsearchClient client) {
this(client, Thread.currentThread().getContextClassLoader());
public ClientBuilder(ElasticsearchClient client, ClassLoader classLoader) {
this.client = client;
this.settingsBuilder = Settings.builder();
settingsBuilder.put("", "elx-client-" + Version.CURRENT);
this.providerMap = new HashMap<>();
ServiceLoader<ExtendedClientProvider> serviceLoader = ServiceLoader.load(ExtendedClientProvider.class,
classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader());
for (ExtendedClientProvider provider : serviceLoader) {
providerMap.put(provider.getClass(), provider);
this.metric = new SimpleBulkMetric();
this.control = new SimpleBulkControl();
public static ClientBuilder builder() {
return new ClientBuilder();
public static ClientBuilder builder(ElasticsearchClient client) {
return new ClientBuilder(client);
public ClientBuilder provider(Class<? extends ExtendedClientProvider> provider) {
this.provider = provider;
return this;
public ClientBuilder put(String key, String value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Integer value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Long value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Double value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, ByteSizeValue value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, TimeValue value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(Settings settings) {
return this;
public ClientBuilder setMetric(BulkMetric metric) {
this.metric = metric;
return this;
public ClientBuilder setControl(BulkControl control) {
this.control = control;
return this;
public <C extends ExtendedClient> C build() throws IOException {
if (provider == null) {
throw new IllegalArgumentException("no provider");
return (C) providerMap.get(provider).getExtendedClient()

@ -0,0 +1,146 @@
package org.xbib.elx.common;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
* Mock client, it does not perform actions on a cluster. Useful for testing or dry runs.
public class MockExtendedClient extends AbstractExtendedClient {
public ElasticsearchClient getClient() {
return null;
public MockExtendedClient init(Settings settings) {
return this;
protected ElasticsearchClient createClient(Settings settings) {
return null;
public MockExtendedClient maxActionsPerRequest(int maxActions) {
return this;
public MockExtendedClient maxConcurrentRequests(int maxConcurrentRequests) {
return this;
public MockExtendedClient maxVolumePerRequest(String maxVolumePerRequest) {
return this;
public MockExtendedClient flushIngestInterval(String interval) {
return this;
public MockExtendedClient index(String index, String type, String id, boolean create, String source) {
return this;
public MockExtendedClient delete(String index, String type, String id) {
return this;
public MockExtendedClient update(String index, String type, String id, String source) {
return this;
public MockExtendedClient indexRequest(IndexRequest indexRequest) {
return this;
public MockExtendedClient deleteRequest(DeleteRequest deleteRequest) {
return this;
public MockExtendedClient updateRequest(UpdateRequest updateRequest) {
return this;
public MockExtendedClient flushIngest() {
return this;
public MockExtendedClient waitForResponses(String timeValue) {
return this;
public MockExtendedClient startBulk(String index, long startRefreshInterval, long stopRefreshIterval) {
return this;
public MockExtendedClient stopBulk(String index) {
return this;
public MockExtendedClient deleteIndex(String index) {
return this;
public MockExtendedClient newIndex(String index) {
return this;
public MockExtendedClient newMapping(String index, String type, Map<String, Object> mapping) {
return this;
public void putMapping(String index) {
public void refreshIndex(String index) {
public void flushIndex(String index) {
public void waitForCluster(String healthColor, String timeValue) {
public int waitForRecovery(String index) {
return -1;
public int updateReplicaLevel(String index, int level) {
return -1;
public void shutdown() {
// nothing to do

@ -0,0 +1,10 @@
package org.xbib.elx.common;
import org.xbib.elx.api.ExtendedClientProvider;
public class MockExtendedClientProvider implements ExtendedClientProvider<MockExtendedClient> {
public MockExtendedClient getExtendedClient() {
return new MockExtendedClient();

@ -0,0 +1,40 @@
package org.xbib.elx.common;
public enum Parameters {
MAX_ACTIONS_PER_REQUEST ("max_actions_per_request"),
int num;
String string;
Parameters(int num) {
this.num = num;
Parameters(String string) {
this.string = string;
int getNum() {
return num;
String getString() {
return string;

@ -1,4 +1,6 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.common;
import org.xbib.elx.api.BulkControl;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;

View file

@ -1,32 +1,48 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.common;
import org.xbib.elx.api.BulkMetric;
import org.xbib.metrics.Count; import org.xbib.metrics.Count;
import org.xbib.metrics.CountMetric; import org.xbib.metrics.CountMetric;
import org.xbib.metrics.Meter; import org.xbib.metrics.Meter;
import org.xbib.metrics.Metered; import org.xbib.metrics.Metered;
* import java.util.concurrent.Executors;
*/ import java.util.concurrent.ScheduledExecutorService;
public class SimpleBulkMetric implements BulkMetric { public class SimpleBulkMetric implements BulkMetric {
private final Meter totalIngest = new Meter(); private final Meter totalIngest;
private final Count totalIngestSizeInBytes = new CountMetric(); private final Count totalIngestSizeInBytes;
private final Count currentIngest = new CountMetric(); private final Count currentIngest;
private final Count currentIngestNumDocs = new CountMetric(); private final Count currentIngestNumDocs;
private final Count submitted = new CountMetric(); private final Count submitted;
private final Count succeeded = new CountMetric(); private final Count succeeded;
private final Count failed = new CountMetric(); private final Count failed;
private Long started; private Long started;
private Long stopped; private Long stopped;
public SimpleBulkMetric() {
public SimpleBulkMetric(ScheduledExecutorService executorService) {
totalIngest = new Meter(executorService);
totalIngestSizeInBytes = new CountMetric();
currentIngest = new CountMetric();
currentIngestNumDocs = new CountMetric();
submitted = new CountMetric();
succeeded = new CountMetric();
failed = new CountMetric();
@Override @Override
public Metered getTotalIngest() { public Metered getTotalIngest() {
return totalIngest; return totalIngest;
@ -65,7 +81,7 @@ public class SimpleBulkMetric implements BulkMetric {
@Override @Override
public void start() { public void start() {
this.started = System.nanoTime(); this.started = System.nanoTime();
totalIngest.spawn(5L); totalIngest.start(5L);
} }
@Override @Override

@ -0,0 +1,25 @@
public class ClasspathURLStreamHandler extends URLStreamHandler {
private final ClassLoader classLoader;
public ClasspathURLStreamHandler() {
this.classLoader = getClass().getClassLoader();
public ClasspathURLStreamHandler(ClassLoader classLoader) {
this.classLoader = classLoader;
protected URLConnection openConnection(URL u) throws IOException {
final URL resourceUrl = classLoader.getResource(u.getPath());
return resourceUrl != null ? resourceUrl.openConnection() : null;

@ -0,0 +1,12 @@
public class ClasspathURLStreamHandlerFactory implements URLStreamHandlerFactory {
public URLStreamHandler createURLStreamHandler(String protocol) {
return "classpath".equals(protocol) ? new ClasspathURLStreamHandler() : null;

@ -0,0 +1 @@

@ -0,0 +1,139 @@
public class IndexDefinition {
private String index;
private String type;
private String fullIndexName;
private String dateTimePattern;
private URL settingsUrl;
private URL mappingsUrl;
private boolean enabled;
private boolean ignoreErrors;
private boolean switchAliases;
private boolean hasForceMerge;
private int replicaLevel;
private IndexRetention indexRetention;
public IndexDefinition setIndex(String index) {
this.index = index;
return this;
public String getIndex() {
return index;
public IndexDefinition setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName;
return this;
public String getFullIndexName() {
return fullIndexName;
public IndexDefinition setType(String type) {
this.type = type;
return this;
public String getType() {
return type;
public IndexDefinition setSettingsUrl(URL settingsUrl) {
this.settingsUrl = settingsUrl;
return this;
public URL getSettingsUrl() {
return settingsUrl;
public IndexDefinition setMappingsUrl(URL mappingsUrl) {
this.mappingsUrl = mappingsUrl;
return this;
public URL getMappingsUrl() {
return mappingsUrl;
public IndexDefinition setDateTimePattern(String timeWindow) {
this.dateTimePattern = timeWindow;
return this;
public String getDateTimePattern() {
return dateTimePattern;
public IndexDefinition setEnabled(boolean enabled) {
this.enabled = enabled;
return this;
public boolean isEnabled() {
return enabled;
public IndexDefinition setIgnoreErrors(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
return this;
public boolean ignoreErrors() {
return ignoreErrors;
public IndexDefinition setSwitchAliases(boolean switchAliases) {
this.switchAliases = switchAliases;
return this;
public boolean isSwitchAliases() {
return switchAliases;
public IndexDefinition setForceMerge(boolean hasForceMerge) {
this.hasForceMerge = hasForceMerge;
return this;
public boolean hasForceMerge() {
return hasForceMerge;
public IndexDefinition setReplicaLevel(int replicaLevel) {
this.replicaLevel = replicaLevel;
return this;
public int getReplicaLevel() {
return replicaLevel;
public IndexDefinition setRetention(IndexRetention indexRetention) {
this.indexRetention = indexRetention;
return this;
public IndexRetention getRetention() {
return indexRetention;

@ -0,0 +1,27 @@
public class IndexRetention {
private int timestampDiff;
private int minToKeep;
public IndexRetention setTimestampDiff(int timestampDiff) {
this.timestampDiff = timestampDiff;
return this;
public int getTimestampDiff() {
return timestampDiff;
public IndexRetention setMinToKeep(int minToKeep) {
this.minToKeep = minToKeep;
return this;
public int getMinToKeep() {
return minToKeep;

@ -0,0 +1 @@

@ -0,0 +1,4 @@
* Common classes for Elasticsearch client extensions.
package org.xbib.elx.common;

@ -1,7 +1,7 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.common.util;
import org.elasticsearch.common.logging.ESLogger; import org.apache.logging.log4j.LogManager;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.apache.logging.log4j.Logger;
import; import;
import; import;
@ -11,6 +11,7 @@ import;
import; import;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -20,7 +21,7 @@ import java.util.Locale;
*/ */
public class NetworkUtils { public class NetworkUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(NetworkUtils.class.getName()); private static final Logger logger = LogManager.getLogger(NetworkUtils.class.getName());
private static final String IPV4_SETTING = ""; private static final String IPV4_SETTING = "";
@ -102,12 +103,10 @@ public class NetworkUtils {
NetworkInterface networkInterface = interfaces.nextElement(); NetworkInterface networkInterface = interfaces.nextElement();
allInterfaces.add(networkInterface); allInterfaces.add(networkInterface);
Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces(); Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces();
if (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) { while (subInterfaces.hasMoreElements()) {
allInterfaces.add(subInterfaces.nextElement()); allInterfaces.add(subInterfaces.nextElement());
} }
} }
sortInterfaces(allInterfaces); sortInterfaces(allInterfaces);
return allInterfaces; return allInterfaces;
} }
@ -223,18 +222,16 @@ public class NetworkUtils {
NetworkInterface networkInterface = interfaces.nextElement(); NetworkInterface networkInterface = interfaces.nextElement();
networkInterfaces.add(networkInterface); networkInterfaces.add(networkInterface);
Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces(); Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces();
if (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) { while (subInterfaces.hasMoreElements()) {
networkInterfaces.add(subInterfaces.nextElement()); networkInterfaces.add(subInterfaces.nextElement());
} }
} }
sortInterfaces(networkInterfaces); sortInterfaces(networkInterfaces);
return networkInterfaces; return networkInterfaces;
} }
private static void sortInterfaces(List<NetworkInterface> interfaces) { private static void sortInterfaces(List<NetworkInterface> interfaces) {
Collections.sort(interfaces, (o1, o2) ->, o2.getIndex())); Collections.sort(interfaces, Comparator.comparingInt(NetworkInterface::getIndex));
} }
private static void sortAddresses(List<InetAddress> addressList) { private static void sortAddresses(List<InetAddress> addressList) {

View file

package org.xbib.elx.common.util;

View file

@ -0,0 +1 @@

View file

@ -0,0 +1 @@

@ -8,9 +8,6 @@ import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
public class MockNode extends Node { public class MockNode extends Node {
public MockNode() { public MockNode() {
@ -34,5 +31,4 @@ public class MockNode extends Node {
list.add(classpathPlugin); list.add(classpathPlugin);
return list; return list;
} }
} }

View file

package org.elasticsearch.node;

View file

@ -1,9 +1,11 @@
package org.xbib.elasticsearch; package org.xbib.elx.common;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
@ -12,8 +14,6 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.junit.Test; import org.junit.Test;
import; import;
@ -29,7 +29,7 @@ import java.util.regex.Pattern;
*/ */
public class AliasTest extends NodeTestUtils { public class AliasTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(AliasTest.class.getName()); private static final Logger logger = LogManager.getLogger(AliasTest.class.getName());
@Test @Test
public void testAlias() throws IOException { public void testAlias() throws IOException {
@ -53,7 +53,7 @@ public class AliasTest extends NodeTestUtils {
} }
@Test @Test
public void testMostRecentIndex() throws IOException { public void testMostRecentIndex() {
String alias = "test"; String alias = "test";
CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101"); CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101");
client("1").admin().indices().create(indexRequest).actionGet(); client("1").admin().indices().create(indexRequest).actionGet();
@ -86,7 +86,7 @@ public class AliasTest extends NodeTestUtils {
assertEquals("test20160103",; assertEquals("test20160103",;
assertEquals("test20160102",; assertEquals("test20160102",;
assertEquals("test20160101",; assertEquals("test20160101",;"result={}", result);"success: result={}", result);
} }
} }

@ -0,0 +1,16 @@
package org.xbib.elx.common;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
public class MockExtendedClientProviderTest {
public void testMockExtendedProvider() throws IOException {
MockExtendedClient client = ClientBuilder.builder().provider(MockExtendedClientProvider.class).build();

View file

@ -1,4 +1,4 @@
package org.xbib.elasticsearch.extras.client; package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -9,15 +9,13 @@ import;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
public class NetworkTest { public class NetworkTest {
private static final Logger logger = LogManager.getLogger(NetworkTest.class); private static final Logger logger = LogManager.getLogger(NetworkTest.class);
@Test @Test
public void testNetwork() throws Exception { public void testNetwork() throws Exception {
// walk very slowly over all interfaces
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface netint : Collections.list(nets)) { for (NetworkInterface netint : Collections.list(nets)) {
System.out.println("checking network interface = " + netint.getName()); System.out.println("checking network interface = " + netint.getName());

@ -0,0 +1,213 @@
package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.junit.After;
import org.junit.Before;
import org.xbib.elx.common.util.NetworkUtils;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class NodeTestUtils {
private static final Logger logger = LogManager.getLogger("test");
private static Random random = new Random();
private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private AtomicInteger counter = new AtomicInteger();
private String cluster;
private String host;
private int port;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
return FileVisitResult.CONTINUE;
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
public void startNodes() {
try {"starting");
try {
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
} catch (Throwable t) {
logger.error("startNodes failed", t);
public void stopNodes() {
try {
} catch (Exception e) {
logger.error("can not close nodes", e);
} finally {
try {
deleteFiles();"data files wiped");
Thread.sleep(2000L); // let OS commit changes
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
// ignore
protected void setClusterName() {
this.cluster = "test-helper-cluster-"
+ NetworkUtils.getLocalAddress().getHostName()
+ "-" + System.getProperty("")
+ "-" + counter.incrementAndGet();
protected String getClusterName() {
return cluster;
protected Settings getSettings() {
return settingsBuilder()
.put("host", host)
.put("port", port)
.put("", cluster)
.put("path.home", getHome())
protected Settings getNodeSettings() {
return settingsBuilder()
.put("", cluster)
.put("cluster.routing.schedule", "50ms")
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", true)
.put("discovery.zen.multicast.ping_timeout", "5s")
.put("http.enabled", true)
.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
.put("index.number_of_replicas", 0)
.put("path.home", getHome())
protected static String getHome() {
return System.getProperty("path.home", System.getProperty("user.dir"));
public void startNode(String id) {
public AbstractClient client(String id) {
return clients.get(id);
private void closeNodes() {"closing all clients");
for (AbstractClient client : clients.values()) {
clients.clear();"closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
nodes.clear();"all nodes closed");
protected void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
port = address.address().getPort();
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
.put("name", id)
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);"clients={}", clients);
return node;
protected String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
return new String(buf);

@ -0,0 +1,56 @@
package org.xbib.elx.common;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class SearchTest extends NodeTestUtils {
public void testSearch() throws Exception {
Client client = client("1");
BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE);
for (int i = 0; i < 1000; i++) {
IndexRequest indexRequest = new IndexRequest("pages", "row")
.field("user1", "joerg")
.field("user2", "joerg")
.field("user3", "joerg")
.field("user4", "joerg")
.field("user5", "joerg")
.field("user6", "joerg")
.field("user7", "joerg")
.field("user8", "joerg")
.field("user9", "joerg")
.field("rowcount", i)
.field("rs", 1234));
client.admin().indices().refresh(new RefreshRequest()).actionGet();
for (int i = 0; i < 100; i++) {
QueryBuilder queryStringBuilder = QueryBuilders.queryStringQuery("rs:" + 1234);
SearchRequestBuilder requestBuilder = client.prepareSearch()
.addSort("rowcount", SortOrder.DESC)
.setFrom(i * 10).setSize(10);
SearchResponse searchResponse = requestBuilder.execute().actionGet();
assertTrue(searchResponse.getHits().getTotalHits() > 0);

@ -1,4 +1,4 @@
package org.xbib.elasticsearch; package org.xbib.elx.common;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -10,16 +10,14 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.junit.Test; import org.junit.Test;
public class SimpleTest extends NodeTestUtils { public class SimpleTest extends NodeTestUtils {
protected Settings getNodeSettings() { protected Settings getNodeSettings() {
return settingsBuilder() return settingsBuilder()
.put("path.home", System.getProperty("path.home")) .put(super.getNodeSettings())
.put("index.analysis.analyzer.default.filter.0", "lowercase") .put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim") .put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword") .put("index.analysis.analyzer.default.tokenizer", "keyword")
@ -32,8 +30,8 @@ public class SimpleTest extends NodeTestUtils {
DeleteIndexRequestBuilder deleteIndexRequestBuilder = DeleteIndexRequestBuilder deleteIndexRequestBuilder =
new DeleteIndexRequestBuilder(client("1"), DeleteIndexAction.INSTANCE, "test"); new DeleteIndexRequestBuilder(client("1"), DeleteIndexAction.INSTANCE, "test");
deleteIndexRequestBuilder.execute().actionGet(); deleteIndexRequestBuilder.execute().actionGet();
} catch (Exception e) { } catch (IndexNotFoundException e) {
// ignore // ignore if index not found
} }
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(client("1"), IndexAction.INSTANCE); IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(client("1"), IndexAction.INSTANCE);
indexRequestBuilder indexRequestBuilder

@ -0,0 +1,62 @@
package org.xbib.elx.common;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
public class WildcardTest extends NodeTestUtils {
protected Settings getNodeSettings() {
return Settings.settingsBuilder()
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", false)
.put("http.enabled", false)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
public void testWildcard() throws Exception {
index(client("1"), "1", "010");
index(client("1"), "2", "0*0");
// exact
validateCount(client("1"), QueryBuilders.queryStringQuery("010").defaultField("field"), 1);
validateCount(client("1"), QueryBuilders.queryStringQuery("0\\*0").defaultField("field"), 1);
// pattern
validateCount(client("1"), QueryBuilders.queryStringQuery("0*0").defaultField("field"), 1); // 2?
validateCount(client("1"), QueryBuilders.queryStringQuery("0?0").defaultField("field"), 1); // 2?
validateCount(client("1"), QueryBuilders.queryStringQuery("0**0").defaultField("field"), 1); // 2?
validateCount(client("1"), QueryBuilders.queryStringQuery("0??0").defaultField("field"), 0);
validateCount(client("1"), QueryBuilders.queryStringQuery("*10").defaultField("field"), 1);
validateCount(client("1"), QueryBuilders.queryStringQuery("*1*").defaultField("field"), 1);
validateCount(client("1"), QueryBuilders.queryStringQuery("*\\*0").defaultField("field"), 0); // 1?
validateCount(client("1"), QueryBuilders.queryStringQuery("*\\**").defaultField("field"), 0); // 1?
private void index(Client client, String id, String fieldValue) throws IOException {
client.index(new IndexRequest("index", "type", id)
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())
private long count(Client client, QueryBuilder queryBuilder) {
return client.prepareSearch("index").setTypes("type")
private void validateCount(Client client, QueryBuilder queryBuilder, long expectedHits) {
final long actualHits = count(client, queryBuilder);
if (actualHits != expectedHits) {
throw new RuntimeException("actualHits=" + actualHits + ", expectedHits=" + expectedHits);

@ -0,0 +1 @@
package org.xbib.elx.common;

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="[%d{ISO8601}][%-5p][%-25c][%t] %m%n"/>
<Root level="info">
<AppenderRef ref="Console" />

View file

@ -0,0 +1,65 @@
buildscript {
repositories {
maven {
url ''
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:"
apply plugin: ''
configurations {
dependencies {
compile project(':common')
compile "org.xbib:netty-http-client:${'xbib-netty-http-client.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
jar {
baseName "${}-common"
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
test {
enabled = true
include '**/SimpleTest.*'
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
randomizedTest {
enabled = false
esTest {
enabled = true
// test with the jars, not the classes, for security manager
// classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty '', 'true'
// maybe we like some extra security policy for our code
systemProperty '', '/extra-security.policy'
esTest.dependsOn jar, testJar

View file

@ -0,0 +1,3 @@
dependencies {
compile project(':elx-common')

View file

@ -0,0 +1,65 @@
buildscript {
repositories {
maven {
url ''
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:"
apply plugin: ''
configurations {
dependencies {
compile project(':common')
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
jar {
baseName "${}-node"
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
test {
enabled = false
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
systemProperty 'path.home', projectDir.absolutePath
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
randomizedTest {
enabled = false
esTest {
// test with the jars, not the classes, for security manager
// classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty '', 'true'
// maybe we like some extra security policy for our code
systemProperty '', '/extra-security.policy'
esTest.dependsOn jar, testJar

@ -0,0 +1,70 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.xbib.elx.common.AbstractExtendedClient;
import java.util.Collection;
import java.util.Collections;
public class ExtendedNodeClient extends AbstractExtendedClient {
private static final Logger logger = LogManager.getLogger(ExtendedNodeClient.class.getName());
private Node node;
protected ElasticsearchClient createClient(Settings settings) throws IOException {
if (settings != null) {
String version = System.getProperty("")
+ " " + System.getProperty("")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.runtime.version")
+ " " + System.getProperty("java.vm.version");
Settings effectiveSettings = Settings.builder().put(settings)
.put("node.client", true)
.put("node.master", false)
.put("", false)
.build();"creating node client on {} with effective settings {}",
version, effectiveSettings.toString());
Collection<Class<? extends Plugin>> plugins = Collections.emptyList();
this.node = new BulkNode(new Environment(effectiveSettings), plugins);
try {
} catch (Exception e) {
throw new IOException(e);
return node.client();
return null;
public void shutdown() throws IOException {
try {
if (node != null) {
logger.debug("closing node...");
} catch (Exception e) {
logger.error(e.getMessage(), e);
private static class BulkNode extends Node {
BulkNode(Environment env, Collection<Class<? extends Plugin>> classpathPlugins) {
super(env, Version.CURRENT, classpathPlugins);

@ -0,0 +1,10 @@
package org.xbib.elx.node;
import org.xbib.elx.api.ExtendedClientProvider;
public class ExtendedNodeClientProvider implements ExtendedClientProvider<ExtendedNodeClient> {
public ExtendedNodeClient getExtendedClient() {
return new ExtendedNodeClient();

@ -0,0 +1 @@

@ -0,0 +1,34 @@
package org.elasticsearch.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList;
import java.util.Collection;
public class MockNode extends Node {
public MockNode() {
public MockNode(Settings settings) {
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins);
public MockNode(Settings settings, Class<? extends Plugin> classpathPlugin) {
this(settings, list(classpathPlugin));
private static Collection<Class<? extends Plugin>> list(Class<? extends Plugin> classpathPlugin) {
Collection<Class<? extends Plugin>> list = new ArrayList<>();
return list;

@ -0,0 +1,58 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
public class ExtendeNodeDuplicateIDTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendeNodeDuplicateIDTest.class.getSimpleName());
private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
private static final Long ACTIONS = 12345L;
public void testDuplicateDocIDs() throws Exception {
long numactions = ACTIONS;
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
try {
for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();"hits = {}", hits);
assertTrue(hits < ACTIONS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

@ -0,0 +1,39 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class ExtendedNodeClientSingleNodeTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeClientSingleNodeTest.class.getSimpleName());
public void testSingleDocNodeClient() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
try {
client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

@ -1,43 +1,37 @@
package org.xbib.elasticsearch.extras.client.node; package org.xbib.elx.node;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import; import;
import; import;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elx.common.Parameters;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** public class ExtendedNodeClientTest extends NodeTestUtils {
public class BulkNodeClientTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeClientTest.class.getSimpleName()); private static final Logger logger = LogManager.getLogger(ExtendedNodeClientTest.class.getSimpleName());
private static final Long MAX_ACTIONS = 1000L; private static final Long ACTIONS = 25000L;
private static final Long NUM_ACTIONS = 1234L; private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
@Before @Before
public void startNodes() { public void startNodes() {
@ -49,13 +43,38 @@ public class BulkNodeClientTest extends NodeTestUtils {
} }
} }
public void testSingleDocNodeClient() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.put(, TimeValue.timeValueSeconds(30))
try {
client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
@Test @Test
public void testNewIndexNodeClient() throws Exception { public void testNewIndexNodeClient() throws Exception {
final BulkNodeClient client = Clients.builder() final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) .provider(ExtendedNodeClientProvider.class)
.setMetric(new SimpleBulkMetric()) .put(, TimeValue.timeValueSeconds(5))
.setControl(new SimpleBulkControl()) .build();
client.newIndex("test"); client.newIndex("test");
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
@ -66,11 +85,10 @@ public class BulkNodeClientTest extends NodeTestUtils {
@Test @Test
public void testMappingNodeClient() throws Exception { public void testMappingNodeClient() throws Exception {
final BulkNodeClient client = Clients.builder() final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(5)) .provider(ExtendedNodeClientProvider.class)
.setMetric(new SimpleBulkMetric()) .put(, TimeValue.timeValueSeconds(5))
.setControl(new SimpleBulkControl()) .build();
XContentBuilder builder = jsonBuilder() XContentBuilder builder = jsonBuilder()
.startObject() .startObject()
.startObject("test") .startObject("test")
@ -85,7 +103,7 @@ public class BulkNodeClientTest extends NodeTestUtils {
client.newIndex("test"); client.newIndex("test");
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test"); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test");
GetMappingsResponse getMappingsResponse = GetMappingsResponse getMappingsResponse =
client.client().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet(); client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();"mappings={}", getMappingsResponse.getMappings());"mappings={}", getMappingsResponse.getMappings());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
@ -94,59 +112,34 @@ public class BulkNodeClientTest extends NodeTestUtils {
client.shutdown(); client.shutdown();
} }
public void testSingleDocNodeClient() {
final BulkNodeClient client = Clients.builder()
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(30))
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
try {
client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
} finally {
assertEquals(1, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
@Test @Test
public void testRandomDocsNodeClient() throws Exception { public void testRandomDocsNodeClient() throws Exception {
long numactions = NUM_ACTIONS; long numactions = ACTIONS;
final BulkNodeClient client = Clients.builder() final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.put(Clients.MAX_ACTIONS_PER_REQUEST, MAX_ACTIONS) .provider(ExtendedNodeClientProvider.class)
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(, MAX_ACTIONS_PER_REQUEST)
.setMetric(new SimpleBulkMetric()) .put(, TimeValue.timeValueSeconds(60))
.setControl(new SimpleBulkControl()) .build();
try { try {
client.newIndex("test"); client.newIndex("test");
for (int i = 0; i < NUM_ACTIONS; i++) { for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
assertEquals(numactions, client.getMetric().getSucceeded().getCount()); assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
} }
assertFalse(client.hasThrowable()); assertFalse(client.hasThrowable());
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
client.shutdown(); client.shutdown();
} }
} }
@ -154,15 +147,15 @@ public class BulkNodeClientTest extends NodeTestUtils {
@Test @Test
public void testThreadedRandomDocsNodeClient() throws Exception { public void testThreadedRandomDocsNodeClient() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors(); int maxthreads = Runtime.getRuntime().availableProcessors();
Long maxactions = MAX_ACTIONS; Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
final Long maxloop = NUM_ACTIONS; final Long actions = ACTIONS;"NodeClient max={} maxactions={} maxloop={}", maxthreads, maxactions, maxloop);"NodeClient max={} maxactions={} maxloop={}", maxthreads, maxActionsPerRequest, actions);
final BulkNodeClient client = Clients.builder() final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.put(Clients.MAX_ACTIONS_PER_REQUEST, maxactions) .provider(ExtendedNodeClientProvider.class)
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60))// disable auto flush for this test .put(, maxthreads * 2)
.setMetric(new SimpleBulkMetric()) .put(, maxActionsPerRequest)
.setControl(new SimpleBulkControl()) .put(, TimeValue.timeValueSeconds(60))
.toBulkNodeClient(client("1")); .build();
try { try {
client.newIndex("test") client.newIndex("test")
.startBulk("test", -1, 1000); .startBulk("test", -1, 1000);
@ -170,39 +163,39 @@ public class BulkNodeClientTest extends NodeTestUtils {
EsExecutors.daemonThreadFactory("bulk-nodeclient-test")); EsExecutors.daemonThreadFactory("bulk-nodeclient-test"));
final CountDownLatch latch = new CountDownLatch(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads);
for (int i = 0; i < maxthreads; i++) { for (int i = 0; i < maxthreads; i++) {
pool.execute(new Runnable() { pool.execute(() -> {
public void run() { for (int i1 = 0; i1 < actions; i1++) {
for (int i = 0; i < maxloop; i++) { client.index("test", "test", null, false,"{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
latch.countDown(); latch.countDown();
}); });
} }"waiting for max 30 seconds...");"waiting for latch...");
latch.await(30, TimeUnit.SECONDS); if (latch.await(5, TimeUnit.MINUTES)) {"flush...");"last flush...");
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("60s");"got all responses, thread pool shutdown...");"got all responses, pool shutdown...");
pool.shutdown(); pool.shutdown();"pool is shut down");"pool is shut down");
} else {
logger.warn("latch timeout");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
client.stopBulk("test"); client.stopBulk("test");
assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
} }
assertFalse(client.hasThrowable()); assertFalse(client.hasThrowable());
client.refreshIndex("test"); client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setQuery(QueryBuilders.matchAllQuery()).setSize(0); .setQuery(QueryBuilders.matchAllQuery()).setSize(0);
assertEquals(maxthreads * maxloop, assertEquals(maxthreads * actions,
searchRequestBuilder.execute().actionGet().getHits().getTotalHits()); searchRequestBuilder.execute().actionGet().getHits().getTotalHits());
client.shutdown(); client.shutdown();
} }
} }
} }

@ -1,24 +1,22 @@
package org.xbib.elasticsearch.extras.client.node; package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/** @Ignore
* public class ExtendedNodeClusterBlockTest extends NodeTestUtils {
public class BulkNodeClusterBlockTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger("test"); private static final Logger logger = LogManager.getLogger("test");
@Before @Before
public void startNodes() { public void startNodes() {
@ -33,6 +31,7 @@ public class BulkNodeClusterBlockTest extends NodeTestUtils {
} }
} }
protected Settings getNodeSettings() { protected Settings getNodeSettings() {
return Settings.settingsBuilder() return Settings.settingsBuilder()
.put(super.getNodeSettings()) .put(super.getNodeSettings())
@ -44,8 +43,7 @@ public class BulkNodeClusterBlockTest extends NodeTestUtils {
public void testClusterBlock() throws Exception { public void testClusterBlock() throws Exception {
BulkRequestBuilder brb = client("1").prepareBulk(); BulkRequestBuilder brb = client("1").prepareBulk();
XContentBuilder builder = jsonBuilder().startObject().field("field1", "value1").endObject(); XContentBuilder builder = jsonBuilder().startObject().field("field1", "value1").endObject();
String jsonString = builder.string(); IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1").setSource(builder);
IndexRequestBuilder irb = client("1").prepareIndex("test", "test", "1").setSource(jsonString);
brb.add(irb); brb.add(irb);
brb.execute().actionGet(); brb.execute().actionGet();
} }

@ -1,17 +1,12 @@
package org.xbib.elasticsearch.extras.client.node; package org.xbib.elx.node;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.IndexAliasAdder;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -19,23 +14,20 @@ import java.util.Map;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
/** @Ignore
* public class ExtendedNodeIndexAliasTest extends NodeTestUtils {
public class BulkNodeIndexAliasTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeIndexAliasTest.class.getSimpleName()); private static final Logger logger = LogManager.getLogger(ExtendedNodeIndexAliasTest.class.getSimpleName());
@Test @Test
public void testIndexAlias() throws Exception { public void testIndexAlias() throws Exception {
final BulkNodeClient client = Clients.builder() final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.setMetric(new SimpleBulkMetric()) .provider(ExtendedNodeClientProvider.class)
.setControl(new SimpleBulkControl()) .build();
try { try {
client.newIndex("test1234"); client.newIndex("test1234");
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
client.index("test1234", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test1234", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.refreshIndex("test1234"); client.refreshIndex("test1234");
@ -45,18 +37,14 @@ public class BulkNodeIndexAliasTest extends NodeTestUtils {
client.newIndex("test5678"); client.newIndex("test5678");
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
client.index("test5678", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test5678", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.refreshIndex("test5678"); client.refreshIndex("test5678");
simpleAliases = Arrays.asList("d", "e", "f"); simpleAliases = Arrays.asList("d", "e", "f");
client.switchAliases("test", "test5678", simpleAliases, new IndexAliasAdder() { client.switchAliases("test", "test5678", simpleAliases, (builder, index, alias) ->
@Override builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
public void addIndexAlias(IndicesAliasesRequestBuilder builder, String index, String alias) {
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias));
Map<String, String> aliases = client.getIndexFilters("test5678"); Map<String, String> aliases = client.getIndexFilters("test5678");"aliases of index test5678 = {}", aliases);"aliases of index test5678 = {}", aliases);

View file

@ -1,5 +1,7 @@
package org.xbib.elasticsearch.extras.client.node; package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
@ -9,15 +11,11 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import; import;
import; import;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.indexing.IndexingStats;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.Map; import java.util.Map;
@ -25,9 +23,10 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
public class BulkNodeReplicaTest extends NodeTestUtils { @Ignore
public class ExtendedNodeReplicaTest extends NodeTestUtils {
private final static ESLogger logger = ESLoggerFactory.getLogger(BulkNodeReplicaTest.class.getSimpleName()); private static final Logger logger = LogManager.getLogger(ExtendedNodeReplicaTest.class.getSimpleName());
@Test @Test
public void testReplicaLevel() throws Exception { public void testReplicaLevel() throws Exception {
@ -47,20 +46,19 @@ public class BulkNodeReplicaTest extends NodeTestUtils {
.put("index.number_of_replicas", 1) .put("index.number_of_replicas", 1)
.build(); .build();
final BulkNodeClient client = Clients.builder() final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.setMetric(new SimpleBulkMetric()) .provider(ExtendedNodeClientProvider.class)
.setControl(new SimpleBulkControl()) .build();
try { try {
client.newIndex("test1", settingsTest1, null) client.newIndex("test1", settingsTest1, null)
.newIndex("test2", settingsTest2, null); .newIndex("test2", settingsTest2, null);
client.waitForCluster("GREEN", "30s"); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {
client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test1", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {
client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test2", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");
@ -70,13 +68,13 @@ public class BulkNodeReplicaTest extends NodeTestUtils {"refreshing");"refreshing");
client.refreshIndex("test1"); client.refreshIndex("test1");
client.refreshIndex("test2"); client.refreshIndex("test2");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test1", "test2") .setIndices("test1", "test2")
.setQuery(matchAllQuery()); .setQuery(matchAllQuery());
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits(); long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();"query total hits={}", hits);"query total hits={}", hits);
assertEquals(2468, hits); assertEquals(2468, hits);
IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.client(), IndicesStatsAction.INSTANCE) IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.getClient(), IndicesStatsAction.INSTANCE)
.all(); .all();
IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet(); IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet();
for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) { for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) {

@ -1,54 +1,47 @@
package org.xbib.elasticsearch.extras.client.transport; package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
/** @Ignore
* public class ExtendedNodeUpdateReplicaLevelTest extends NodeTestUtils {
public class BulkTransportUpdateReplicaLevelTest extends NodeTestUtils {
private static final ESLogger logger = private static final Logger logger = LogManager.getLogger(ExtendedNodeUpdateReplicaLevelTest.class.getSimpleName());
@Test @Test
public void testUpdateReplicaLevel() throws Exception { public void testUpdateReplicaLevel() throws Exception {
int numberOfShards = 2; long numberOfShards = 2;
int replicaLevel = 3; int replicaLevel = 3;
// we need 3 nodes for replica level 3 // we need 3 nodes for replica level 3
startNode("2"); startNode("2");
startNode("3"); startNode("3");
int shardsAfterReplica; long shardsAfterReplica;
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("index.number_of_shards", numberOfShards) .put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.build(); .build();
final BulkTransportClient client = Clients.builder() final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.put(getSettings()) .provider(ExtendedNodeClientProvider.class)
.setMetric(new SimpleBulkMetric()) .build();
.setControl(new SimpleBulkControl())
try { try {
client.newIndex("replicatest", settings, null); client.newIndex("replicatest", settings, null);
client.waitForCluster("GREEN", "30s"); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) { for (int i = 0; i < 12345; i++) {
client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("replicatest", "replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");

@ -0,0 +1,201 @@
package org.xbib.elx.node;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.junit.After;
import org.junit.Before;
import org.xbib.elx.common.util.NetworkUtils;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class NodeTestUtils {
private static final Logger logger = LogManager.getLogger("test");
private static Random random = new Random();
private static char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private AtomicInteger counter = new AtomicInteger();
private String cluster;
private String host;
private int port;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
return FileVisitResult.CONTINUE;
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
public void startNodes() {
try {"starting");
try {
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
} catch (Throwable t) {
logger.error("startNodes failed", t);
public void stopNodes() {
try {
} catch (Exception e) {
logger.error("can not close nodes", e);
} finally {
try {
deleteFiles();"data files wiped");
Thread.sleep(2000L); // let OS commit changes
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
// ignore
protected void setClusterName() {
this.cluster = "test-helper-cluster-"
+ NetworkUtils.getLocalAddress().getHostName()
+ "-" + System.getProperty("")
+ "-" + counter.incrementAndGet();
protected Settings getNodeSettings() {
return settingsBuilder()
.put("", cluster)
.put("cluster.routing.schedule", "50ms")
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", true)
.put("discovery.zen.multicast.ping_timeout", "5s")
.put("http.enabled", true)
.put("threadpool.bulk.size", Runtime.getRuntime().availableProcessors())
.put("threadpool.bulk.queue_size", 16 * Runtime.getRuntime().availableProcessors()) // default is 50, too low
.put("index.number_of_replicas", 0)
.put("path.home", getHome())
protected static String getHome() {
return System.getProperty("path.home", System.getProperty("user.dir"));
public void startNode(String id) {
public AbstractClient client(String id) {
return clients.get(id);
private void closeNodes() {"closing all clients");
for (AbstractClient client : clients.values()) {
clients.clear();"closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
nodes.clear();"all nodes closed");
protected void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Object obj = response.iterator().next().getTransport().getAddress()
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
port = address.address().getPort();
private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder()
.put("name", id)
.build();"settings={}", nodeSettings.getAsMap());
Node node = new MockNode(nodeSettings);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);"clients={}", clients);
return node;
protected String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
return new String(buf);

@ -2,7 +2,7 @@
<configuration status="OFF"> <configuration status="OFF">
<appenders> <appenders>
<Console name="Console" target="SYSTEM_OUT"> <Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="[%d{ABSOLUTE}][%-5p][%-25c][%t] %m%n"/> <PatternLayout pattern="[%d{ISO8601}][%-5p][%-25c][%t] %m%n"/>
</Console> </Console>
</appenders> </appenders>
<Loggers> <Loggers>

@ -0,0 +1,3 @@
dependencies {
compile project(':elx-common')

View file

@ -0,0 +1,63 @@
buildscript {
repositories {
maven {
url ''
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:"
apply plugin: ''
configurations {
dependencies {
compile project(':common')
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
jar {
baseName "${}-transport"
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
esTest {
enabled = true
// test with the jars, not the classes, for security manager
classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty '', 'true'
// maybe we like some extra security policy for our code
systemProperty '', '/extra-security.policy'
esTest.dependsOn jar, testJar
randomizedTest {
enabled = false
test {
enabled = false
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
systemProperty 'path.home', projectDir.absolutePath
testLogging {
showStandardStreams = true
exceptionFormat = 'full'

@ -0,0 +1,129 @@
package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.xbib.elx.common.AbstractExtendedClient;
import org.xbib.elx.common.util.NetworkUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
* Transport client with additional methods using the BulkProcessor.
public class ExtendedTransportClient extends AbstractExtendedClient {
private static final Logger logger = LogManager.getLogger(ExtendedTransportClient.class.getName());
protected ElasticsearchClient createClient(Settings settings) {
if (settings != null) {
String systemIdentifier = System.getProperty("")
+ " " + System.getProperty("")
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.vm.version")
+ " Elasticsearch " + Version.CURRENT.toString();"creating transport client on {} with effective settings {}",
systemIdentifier, settings.getAsMap());
TransportClient.Builder builder = TransportClient.builder()
.put("", settings.get(""))
.put("processors", settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
.put("client.transport.ignore_cluster_name", true)
return null;
public ExtendedTransportClient init(Settings settings) throws IOException {
// additional auto-connect
try {
Collection<InetSocketTransportAddress> addrs = findAddresses(settings);
if (!connect(addrs, settings.getAsBoolean("autodiscover", false))) {
throw new NoNodeAvailableException("no cluster nodes available, check settings "
+ settings.toString());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return this;
public synchronized void shutdown() throws IOException {
super.shutdown();"shutting down...");
if (getClient() != null) {
TransportClient client = (TransportClient) getClient();
}"shutting down completed");
private Collection<InetSocketTransportAddress> findAddresses(Settings settings) throws IOException {
final int defaultPort = settings.getAsInt("port", 9300);
Collection<InetSocketTransportAddress> addresses = new ArrayList<>();
for (String hostname : settings.getAsArray("host")) {
String[] splitHost = hostname.split(":", 2);
if (splitHost.length == 2) {
try {
String host = splitHost[0];
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
int port = Integer.parseInt(splitHost[1]);
InetSocketTransportAddress address = new InetSocketTransportAddress(inetAddress, port);
} catch (NumberFormatException e) {
logger.warn(e.getMessage(), e);
if (splitHost.length == 1) {
String host = splitHost[0];
InetAddress inetAddress = NetworkUtils.resolveInetAddress(host, null);
InetSocketTransportAddress address = new InetSocketTransportAddress(inetAddress, defaultPort);
return addresses;
private boolean connect(Collection<InetSocketTransportAddress> addresses, boolean autodiscover) {
if (getClient() == null) {
throw new IllegalStateException("no client present");
logger.debug("trying to connect to {}", addresses);
TransportClient transportClient = (TransportClient) getClient();
List<DiscoveryNode> nodes = transportClient.connectedNodes();"connected to nodes = {}", nodes);
if (nodes != null && !nodes.isEmpty()) {
if (autodiscover) {
logger.debug("trying to auto-discover all nodes...");
ClusterStateRequestBuilder clusterStateRequestBuilder =
new ClusterStateRequestBuilder(getClient(), ClusterStateAction.INSTANCE);
ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
DiscoveryNodes discoveryNodes = clusterStateResponse.getState().getNodes();
transportClient.addDiscoveryNodes(discoveryNodes);"after auto-discovery: connected to {}", transportClient.connectedNodes());
return true;
return false;

@ -0,0 +1,11 @@
package org.xbib.elx.transport;
import org.xbib.elx.api.ExtendedClientProvider;
public class ExtendedTransportClientProvider implements ExtendedClientProvider<ExtendedTransportClient> {
public ExtendedTransportClient getExtendedClient() {
return new ExtendedTransportClient();

View file

@ -1,9 +1,8 @@
package org.xbib.elasticsearch.extras.client.transport; package org.xbib.elx.transport;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -25,7 +24,6 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterNameModule; import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injector;
@ -54,6 +52,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -278,16 +277,11 @@ public class TransportClient extends AbstractClient {
} }
} }
try { try {
FutureTransportResponseHandler<LivenessResponse> responseHandler = new LivenessResponseHandler();
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, LivenessResponse livenessResponse = transportService.submitRequest(listedNode,
TransportLivenessAction.NAME, headers.applyTo(new LivenessRequest()), TransportLivenessAction.NAME, headers.applyTo(new LivenessRequest()),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE) TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(), .withTimeout(pingTimeout).build(),responseHandler).txGet();
new FutureTransportResponseHandler<LivenessResponse>() {
public LivenessResponse newInstance() {
return new LivenessResponse();
if (!clusterName.equals(livenessResponse.getClusterName())) { if (!clusterName.equals(livenessResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
newFilteredNodes.add(listedNode); newFilteredNodes.add(listedNode);
@ -347,12 +341,10 @@ public class TransportClient extends AbstractClient {
} }
} }
public static class Builder { public static class Builder {
private Settings settings = Settings.EMPTY; private Settings settings = Settings.EMPTY;
private List<Class<? extends Plugin>> pluginClasses = new ArrayList<>(); private List<Class<? extends Plugin>> pluginClasses = new ArrayList<>();
public Builder settings(Settings.Builder settings) { public Builder settings(Settings.Builder settings) {
@ -395,12 +387,7 @@ public class TransportClient extends AbstractClient {
modules.add(new ClusterNameModule(this.settings)); modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool)); modules.add(new ThreadPoolModule(threadPool));
modules.add(new TransportModule(this.settings)); modules.add(new TransportModule(this.settings));
modules.add(new SearchModule() { modules.add(new TransportSearchModule());
protected void configure() {
// noop
modules.add(new ActionModule(true)); modules.add(new ActionModule(true));
modules.add(new ClientTransportModule()); modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings)); modules.add(new CircuitBreakerModule(this.settings));
@ -419,27 +406,39 @@ public class TransportClient extends AbstractClient {
} }
/** /**
* The {@link ProxyActionMap} must be declared public. * The {@link ProxyActionMap} must be declared public for injection.
*/ */
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public static class ProxyActionMap { public static class ProxyActionMap {
private final ImmutableMap<Action, TransportActionNodeProxy> proxies; private final Map<Action, TransportActionNodeProxy> proxies;
@Inject @Inject
public ProxyActionMap(Settings settings, TransportService transportService, Map<String, GenericAction> actions) { public ProxyActionMap(Settings settings, TransportService transportService, Map<String, GenericAction> actions) {
MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>(); this.proxies = new LinkedHashMap<>();
for (GenericAction action : actions.values()) { for (GenericAction action : actions.values()) {
if (action instanceof Action) { if (action instanceof Action) {
actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService)); this.proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
} }
} }
this.proxies = actionsBuilder.immutableMap();
} }
public ImmutableMap<Action, TransportActionNodeProxy> getProxies() { public Map<Action, TransportActionNodeProxy> getProxies() {
return proxies; return proxies;
} }
} }
private static class LivenessResponseHandler extends FutureTransportResponseHandler<LivenessResponse> {
public LivenessResponse newInstance() {
return new LivenessResponse();
private static class TransportSearchModule extends SearchModule {
protected void configure() {
// noop
} }

@ -0,0 +1,4 @@
* Classes for Elasticsearch transport client extensions.
package org.xbib.elx.transport;

@ -0,0 +1 @@

@ -0,0 +1,34 @@
package org.elasticsearch.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList;
import java.util.Collection;
public class MockNode extends Node {
public MockNode() {
public MockNode(Settings settings) {
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins);
public MockNode(Settings settings, Class<? extends Plugin> classpathPlugin) {
this(settings, list(classpathPlugin));
private static Collection<Class<? extends Plugin>> list(Class<? extends Plugin> classpathPlugin) {
Collection<Class<? extends Plugin>> list = new ArrayList<>();
return list;

package org.elasticsearch.node;

@ -0,0 +1,40 @@
package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class ExtendedTransportClientSingleNodeTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportClientSingleNodeTest.class.getSimpleName());
public void testSingleDocNodeClient() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
try {
client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

@ -1,40 +1,34 @@
package org.xbib.elasticsearch.extras.client.transport; package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import; import;
import; import;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elasticsearch.extras.client.Clients; import org.xbib.elx.common.Parameters;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
public class BulkTransportClientTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportClientTest.class.getSimpleName()); public class ExtendedTransportClientTest extends NodeTestUtils {
private static final Long MAX_ACTIONS = 1000L; private static final Logger logger = LogManager.getLogger(ExtendedTransportClientTest.class.getSimpleName());
private static final Long NUM_ACTIONS = 1234L; private static final Long MAX_ACTIONS_PER_REQUEST = 1000L;
private static final Long ACTIONS = 1234L;
@Before @Before
public void startNodes() { public void startNodes() {
@ -47,13 +41,12 @@ public class BulkTransportClientTest extends NodeTestUtils {
} }
@Test @Test
public void testBulkClient() throws IOException { public void testBulkClient() throws Exception {
final BulkTransportClient client = Clients.builder() final ExtendedTransportClient client = ClientBuilder.builder()
.put(getSettings()) .put(getSettings())
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .build();
.setControl(new SimpleBulkControl())
client.newIndex("test"); client.newIndex("test");
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
@ -75,27 +68,24 @@ public class BulkTransportClientTest extends NodeTestUtils {
} }
@Test @Test
public void testSingleDocBulkClient() throws IOException { public void testSingleDocBulkClient() throws Exception {
final BulkTransportClient client = Clients.builder() final ExtendedTransportClient client = ClientBuilder.builder()
.put(getSettings()) .put(getSettings())
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .build();
.setControl(new SimpleBulkControl())
try { try {
client.newIndex("test"); client.newIndex("test");
client.index("test", "test", "1", "{ \"name\" : \"Hello World\"}"); // single doc ingest client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}");
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
assertEquals(1, client.getMetric().getSucceeded().getCount()); assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
} }
@ -105,30 +95,27 @@ public class BulkTransportClientTest extends NodeTestUtils {
} }
@Test @Test
public void testRandomDocsBulkClient() throws IOException { public void testRandomDocsBulkClient() throws Exception {
long numactions = NUM_ACTIONS; long numactions = ACTIONS;
final BulkTransportClient client = Clients.builder() final ExtendedTransportClient client = ClientBuilder.builder()
.put(getSettings()) .put(getSettings())
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) .put(, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .build();
.setControl(new SimpleBulkControl())
try { try {
client.newIndex("test"); client.newIndex("test");
for (int i = 0; i < NUM_ACTIONS; i++) { for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
assertEquals(numactions, client.getMetric().getSucceeded().getCount()); assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
} }
@ -140,54 +127,54 @@ public class BulkTransportClientTest extends NodeTestUtils {
@Test @Test
public void testThreadedRandomDocsBulkClient() throws Exception { public void testThreadedRandomDocsBulkClient() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors(); int maxthreads = Runtime.getRuntime().availableProcessors();
long maxactions = MAX_ACTIONS; long maxactions = MAX_ACTIONS_PER_REQUEST;
final long maxloop = NUM_ACTIONS; final long maxloop = ACTIONS;
Settings settingsForIndex = Settings.settingsBuilder() Settings settingsForIndex = Settings.settingsBuilder()
.put("index.number_of_shards", 2) .put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1) .put("index.number_of_replicas", 1)
.build(); .build();
final BulkTransportClient client = Clients.builder() final ExtendedTransportClient client = ClientBuilder.builder()
.put(getSettings()) .put(getSettings())
.put(Clients.MAX_ACTIONS_PER_REQUEST, maxactions) .put(, maxactions)
.put(Clients.FLUSH_INTERVAL, TimeValue.timeValueSeconds(60)) // = disable autoflush for this test .put(, TimeValue.timeValueSeconds(60))
.setMetric(new SimpleBulkMetric()) .build();
.setControl(new SimpleBulkControl())
try { try {
client.newIndex("test", settingsForIndex, null) client.newIndex("test", settingsForIndex, null)
.startBulk("test", -1, 1000); .startBulk("test", -1, 1000);
ThreadPoolExecutor pool = ThreadPoolExecutor pool = EsExecutors.newFixed("bulkclient-test", maxthreads, 30,
EsExecutors.newFixed("bulkclient-test", maxthreads, 30, EsExecutors.daemonThreadFactory("bulkclient-test")); EsExecutors.daemonThreadFactory("bulkclient-test"));
final CountDownLatch latch = new CountDownLatch(maxthreads); final CountDownLatch latch = new CountDownLatch(maxthreads);
for (int i = 0; i < maxthreads; i++) { for (int i = 0; i < maxthreads; i++) {
pool.execute(() -> { pool.execute(() -> {
for (int i1 = 0; i1 < maxloop; i1++) { for (int i1 = 0; i1 < maxloop; i1++) {
client.index("test", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
latch.countDown(); latch.countDown();
}); });
} }"waiting for max 30 seconds...");"waiting for latch...");
latch.await(30, TimeUnit.SECONDS); if (latch.await(60, TimeUnit.SECONDS)) {"client flush ...");"flush ...");
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");"thread pool to be shut down ...");"pool to be shut down ...");
pool.shutdown(); pool.shutdown();"poot shut down");"poot shut down");
} catch (NoNodeAvailableException e) { } catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available"); logger.warn("skipping, no node available");
} finally { } finally {
client.stopBulk("test"); client.stopBulk("test");
assertEquals(maxthreads * maxloop, client.getMetric().getSucceeded().getCount()); assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) { if (client.hasThrowable()) {
logger.error("error", client.getThrowable()); logger.error("error", client.getThrowable());
} }
assertFalse(client.hasThrowable()); assertFalse(client.hasThrowable());
client.refreshIndex("test"); client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
// to avoid NPE at // to avoid NPE at
.setIndices("_all") .setIndices("_all")
.setQuery(QueryBuilders.matchAllQuery()) .setQuery(QueryBuilders.matchAllQuery())
@ -197,5 +184,4 @@ public class BulkTransportClientTest extends NodeTestUtils {
client.shutdown(); client.shutdown();
} }
} }
} }

@ -0,0 +1,57 @@
package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
public class ExtendedTransportDuplicateIDTest extends NodeTestUtils {
private final static Logger logger = LogManager.getLogger(ExtendedTransportDuplicateIDTest.class.getSimpleName());
private final static Long MAX_ACTIONS_PER_REQUEST = 1000L;
private final static Long ACTIONS = 12345L;
public void testDuplicateDocIDs() throws Exception {
long numactions = ACTIONS;
final ExtendedTransportClient client = ClientBuilder.builder()
try {
for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();"hits = {}", hits);
assertTrue(hits < ACTIONS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

@ -0,0 +1,65 @@
package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertFalse;
public class ExtendedTransportIndexAliasTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportIndexAliasTest.class.getSimpleName());
public void testIndexAlias() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
try {
for (int i = 0; i < 1; i++) {
client.index("test1234", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
List<String> simpleAliases = Arrays.asList("a", "b", "c");
client.switchAliases("test", "test1234", simpleAliases);
for (int i = 0; i < 1; i++) {
client.index("test5678", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
simpleAliases = Arrays.asList("d", "e", "f");
client.switchAliases("test", "test5678", simpleAliases, (builder, index, alias) ->
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
Map<String, String> aliases = client.getIndexFilters("test5678");"aliases of index test5678 = {}", aliases);
aliases = client.getAliasFilters("test");"aliases of alias test = {}", aliases);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

@ -1,5 +1,7 @@
package org.xbib.elasticsearch.extras.client.transport; package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
@ -9,15 +11,10 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import; import;
import; import;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.indexing.IndexingStats;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import java.util.Map; import java.util.Map;
@ -28,9 +25,9 @@ import static org.junit.Assert.assertFalse;
/** /**
* *
*/ */
public class BulkTransportReplicaTest extends NodeTestUtils { public class ExtendedTransportReplicaTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkTransportReplicaTest.class.getSimpleName()); private static final Logger logger = LogManager.getLogger(ExtendedTransportReplicaTest.class.getSimpleName());
@Test @Test
public void testReplicaLevel() throws Exception { public void testReplicaLevel() throws Exception {
@ -50,20 +47,20 @@ public class BulkTransportReplicaTest extends NodeTestUtils {
.put("index.number_of_replicas", 1) .put("index.number_of_replicas", 1)
.build(); .build();
final BulkTransportClient client = Clients.builder() final ExtendedTransportClient client = ClientBuilder.builder()
.put(getSettings()) .put(getSettings())
.setMetric(new SimpleBulkMetric()) .build();
.setControl(new SimpleBulkControl())
try { try {
client.newIndex("test1", settingsTest1, null) client.newIndex("test1", settingsTest1, null)
.newIndex("test2", settingsTest2, null); .newIndex("test2", settingsTest2, null);
client.waitForCluster("GREEN", "30s"); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {
client.index("test1", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test1", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {
client.index("test2", "test", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("test2", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");
@ -73,13 +70,13 @@ public class BulkTransportReplicaTest extends NodeTestUtils {"refreshing");"refreshing");
client.refreshIndex("test1"); client.refreshIndex("test1");
client.refreshIndex("test2"); client.refreshIndex("test2");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test1", "test2") .setIndices("test1", "test2")
.setQuery(matchAllQuery()); .setQuery(matchAllQuery());
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits(); long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();"query total hits={}", hits);"query total hits={}", hits);
assertEquals(2468, hits); assertEquals(2468, hits);
IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.client(), IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.getClient(),
IndicesStatsAction.INSTANCE).all(); IndicesStatsAction.INSTANCE).all();
IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet(); IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet();
for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) { for (Map.Entry<String, IndexStats> m : response.getIndices().entrySet()) {

@ -1,29 +1,23 @@
package org.xbib.elasticsearch.extras.client.node; package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.junit.Test; import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils; import org.xbib.elx.common.ClientBuilder;
import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
/** public class ExtendedTransportUpdateReplicaLevelTest extends NodeTestUtils {
public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeUpdateReplicaLevelTest.class.getSimpleName()); private static final Logger logger = LogManager.getLogger(ExtendedTransportUpdateReplicaLevelTest.class.getSimpleName());
@Test @Test
public void testUpdateReplicaLevel() throws Exception { public void testUpdateReplicaLevel() throws Exception {
int numberOfShards = 2; long numberOfShards = 2;
int replicaLevel = 3; int replicaLevel = 3;
// we need 3 nodes for replica level 3 // we need 3 nodes for replica level 3
@ -32,21 +26,21 @@ public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils {
int shardsAfterReplica; int shardsAfterReplica;
final ExtendedTransportClient client = ClientBuilder.builder()
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("index.number_of_shards", numberOfShards) .put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.build(); .build();
final BulkNodeClient client = Clients.builder()
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
try { try {
client.newIndex("replicatest", settings, null); client.newIndex("replicatest", settings, null);
client.waitForCluster("GREEN", "30s"); client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) { for (int i = 0; i < 12345; i++) {
client.index("replicatest", "replicatest", null, "{ \"name\" : \"" + randomString(32) + "\"}"); client.index("replicatest", "replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
} }
client.flushIngest(); client.flushIngest();
client.waitForResponses("30s"); client.waitForResponses("30s");
@ -62,5 +56,4 @@ public class BulkNodeUpdateReplicaLevelTest extends NodeTestUtils {
assertFalse(client.hasThrowable()); assertFalse(client.hasThrowable());
} }
} }
} }

@ -1,7 +1,7 @@
package org.xbib.elasticsearch; package org.xbib.elx.transport;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ElasticsearchTimeoutException;
import; import;
import; import;
@ -10,8 +10,6 @@ import;
import; import;
import; import;
import; import;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -19,22 +17,25 @@ import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.xbib.elasticsearch.extras.client.NetworkUtils; import org.xbib.elx.common.util.NetworkUtils;
import; import;
import java.nio.file.*; import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class NodeTestUtils { public class NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger("test"); private static final Logger logger = LogManager.getLogger("test");
private static Random random = new Random(); private static Random random = new Random();
@ -53,7 +54,7 @@ public class NodeTestUtils {
private int port; private int port;
private static void deleteFiles() throws IOException { private static void deleteFiles() throws IOException {
Path directory = Paths.get(System.getProperty("path.home") + "/data"); Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override @Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
@ -148,11 +149,11 @@ public class NodeTestUtils {
.build(); .build();
} }
protected String getHome() { protected static String getHome() {
return System.getProperty("path.home"); return System.getProperty("path.home", System.getProperty("user.dir"));
} }
public void startNode(String id) throws IOException { public void startNode(String id) {
buildNode(id).start(); buildNode(id).start();
} }
@ -160,7 +161,7 @@ public class NodeTestUtils {
return clients.get(id); return clients.get(id);
} }
private void closeNodes() throws IOException { private void closeNodes() {"closing all clients");"closing all clients");
for (AbstractClient client : clients.values()) { for (AbstractClient client : clients.values()) {
client.close(); client.close();
@ -188,7 +189,7 @@ public class NodeTestUtils {
} }
} }
private Node buildNode(String id) throws IOException { private Node buildNode(String id) {
Settings nodeSettings = settingsBuilder() Settings nodeSettings = settingsBuilder()
.put(getNodeSettings()) .put(getNodeSettings())
.put("name", id) .put("name", id)

@ -0,0 +1 @@
View file

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="[%d{ISO8601}][%-5p][%-25c][%t] %m%n"/>
<Root level="info">
<AppenderRef ref="Console" />

@ -1,3 +1,19 @@
group = org.xbib group = org.xbib
name = elasticsearch-extras-client name = elx
version = version =
xbib-metrics.version = 1.1.0
xbib-guice.version = 4.0.4
elasticsearch.version = 2.2.1
jna.version = 4.5.2
log4j.version = 2.11.1
mustache.version = 0.9.5
jts.version = 1.13
jackson-dataformat.version = 2.8.11
junit.version = 4.12
wagon.version = 3.0.0
asciidoclet.version = 1.5.4
org.gradle.warning.mode = all

@ -1,8 +0,0 @@
ext {
user = 'xbib'
name = 'elasticsearch-extras-client'
description = 'Some extras implemented for using Elasticsearch clients (node and transport)'
scmUrl = '' + user + '/' + name
scmConnection = 'scm:git:git://' + user + '/' + name + '.git'
scmDeveloperConnection = 'scm:git:git://' + user + '/' + name + '.git'

View file

@ -1,12 +1,19 @@
ext {
description = 'Extensions for Elasticsearch clients (node and transport)'
scmUrl = ''
scmConnection = 'scm:git:git://'
scmDeveloperConnection = 'scm:git:git://'
task xbibUpload(type: Upload) { task xbibUpload(type: Upload, dependsOn: build) {
group = 'publish'
configuration = configurations.archives configuration = configurations.archives
uploadDescriptor = true uploadDescriptor = true
repositories { repositories {
if (project.hasProperty("xbibUsername")) { if (project.hasProperty("xbibUsername")) {
mavenDeployer { mavenDeployer {
configuration = configurations.wagon configuration = configurations.wagon
repository(url: 'scpexe://') { repository(url: uri('xbibUrl'))) {
authentication(userName: xbibUsername, privateKey: xbibPrivateKey) authentication(userName: xbibUsername, privateKey: xbibPrivateKey)
} }
} }
@ -14,7 +21,8 @@ task xbibUpload(type: Upload) {
} }
} }
task sonaTypeUpload(type: Upload) { task sonaTypeUpload(type: Upload, dependsOn: build) {
group = 'publish'
configuration = configurations.archives configuration = configurations.archives
uploadDescriptor = true uploadDescriptor = true
repositories { repositories {
@ -34,7 +42,7 @@ task sonaTypeUpload(type: Upload) {
name name
description description description description
packaging 'jar' packaging 'jar'
inceptionYear '2012' inceptionYear '2019'
url scmUrl url scmUrl
organization { organization {
name 'xbib' name 'xbib'
@ -42,7 +50,7 @@ task sonaTypeUpload(type: Upload) {
} }
developers { developers {
developer { developer {
id user id 'xbib'
name 'Jörg Prante' name 'Jörg Prante'
email '' email ''
url '' url ''
@ -64,3 +72,7 @@ task sonaTypeUpload(type: Upload) {
} }
} }
} }
nexusStaging {
packageGroup = "org.xbib"

@ -1,6 +1,6 @@
#Tue Jan 03 14:13:22 CET 2017 #Fri Feb 15 11:59:10 CET 2019
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
distributionUrl=https\:// distributionUrl=https\://

View file

@ -28,7 +28,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"` APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
# Use the maximum available, or set MAX_FD != -1 to use that value. # Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum" MAX_FD="maximum"

View file

@ -14,7 +14,7 @@ set APP_BASE_NAME=%~n0
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
@rem Find java.exe @rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome if defined JAVA_HOME goto findJavaFromJavaHome

@ -1 +1,5 @@ = 'elasticsearch-extras-client' include 'elx-api'
include 'elx-common'
include 'elx-node'
include 'elx-transport'
include 'elx-http'

@ -1,4 +0,0 @@
* Classes to support Elasticsearch node creation.
package org.elasticsearch.node;

@ -1,70 +0,0 @@
package org.xbib.elasticsearch;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.client.Requests.refreshRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
public class SearchTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger("test");
public void testSearch() throws Exception {
Client client = client("1");
long t0 = System.currentTimeMillis();
BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE);
for (int i = 0; i < 1000; i++) {
.field("user1", "kimchy")
.field("user2", "kimchy")
.field("user3", "kimchy")
.field("user4", "kimchy")
.field("user5", "kimchy")
.field("user6", "kimchy")
.field("user7", "kimchy")
.field("user8", "kimchy")
.field("user9", "kimchy")
.field("rowcount", i)
.field("rs", 1234)));
long t1 = System.currentTimeMillis();"t1-t0 = {}", t1 - t0);
for (int i = 0; i < 100; i++) {
t1 = System.currentTimeMillis();
QueryBuilder queryStringBuilder =
QueryBuilders.queryStringQuery("rs:" + 1234);
SearchRequestBuilder requestBuilder = client.prepareSearch()
.addSort("rowcount", SortOrder.DESC)
.setFrom(i * 10).setSize(10);
SearchResponse response = requestBuilder.execute().actionGet();
long t2 = System.currentTimeMillis();"t2-t1 = {}", t2 - t1);

@ -1,69 +0,0 @@
package org.xbib.elasticsearch;
import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.junit.Test;
public class WildcardTest extends NodeTestUtils {
protected Settings getNodeSettings() {
return settingsBuilder()
.put("", getClusterName())
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", false)
.put("http.enabled", false)
.put("path.home", System.getProperty("path.home"))
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
public void testWildcard() throws Exception {
index(client("1"), "1", "010");
index(client("1"), "2", "0*0");
// exact
validateCount(client("1"), queryStringQuery("010").defaultField("field"), 1);
validateCount(client("1"), queryStringQuery("0\\*0").defaultField("field"), 1);
// pattern
validateCount(client("1"), queryStringQuery("0*0").defaultField("field"), 1); // 2?
validateCount(client("1"), queryStringQuery("0?0").defaultField("field"), 1); // 2?
validateCount(client("1"), queryStringQuery("0**0").defaultField("field"), 1); // 2?
validateCount(client("1"), queryStringQuery("0??0").defaultField("field"), 0);
validateCount(client("1"), queryStringQuery("*10").defaultField("field"), 1);
validateCount(client("1"), queryStringQuery("*1*").defaultField("field"), 1);
validateCount(client("1"), queryStringQuery("*\\*0").defaultField("field"), 0); // 1?
validateCount(client("1"), queryStringQuery("*\\**").defaultField("field"), 0); // 1?
private void index(Client client, String id, String fieldValue) throws IOException {
.source(jsonBuilder().startObject().field("field", fieldValue).endObject())
private long count(Client client, QueryBuilder queryBuilder) {
return client.prepareSearch("index").setTypes("type")
private void validateCount(Client client, QueryBuilder queryBuilder, long expectedHits) {
final long actualHits = count(client, queryBuilder);
if (actualHits != expectedHits) {
throw new RuntimeException("actualHits=" + actualHits + ", expectedHits=" + expectedHits);

@ -1,62 +0,0 @@
package org.xbib.elasticsearch.extras.client.node;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
public class BulkNodeDuplicateIDTest extends NodeTestUtils {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkNodeDuplicateIDTest.class.getSimpleName());
private static final Long MAX_ACTIONS = 1000L;
private static final Long NUM_ACTIONS = 12345L;
public void testDuplicateDocIDs() throws Exception {
long numactions = NUM_ACTIONS;
final BulkNodeClient client = Clients.builder()
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
try {
for (int i = 0; i < NUM_ACTIONS; i++) {
client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE)
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();"hits = {}", hits);
assertTrue(hits < NUM_ACTIONS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

@ -1,4 +0,0 @@
* Classes for testing Elasticsearch node client extras.
package org.xbib.elasticsearch.extras.client.node;

@ -1,4 +0,0 @@
* Classes to test Elasticsearch clients.
package org.xbib.elasticsearch.extras.client;

@ -1,60 +0,0 @@
package org.xbib.elasticsearch.extras.client.transport;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.junit.Test;
import org.xbib.elasticsearch.NodeTestUtils;
import org.xbib.elasticsearch.extras.client.Clients;
import org.xbib.elasticsearch.extras.client.SimpleBulkControl;
import org.xbib.elasticsearch.extras.client.SimpleBulkMetric;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.*;
public class BulkTransportDuplicateIDTest extends NodeTestUtils {
private final static ESLogger logger = ESLoggerFactory.getLogger(BulkTransportDuplicateIDTest.class.getSimpleName());
private final static Long MAX_ACTIONS = 1000L;
private final static Long NUM_ACTIONS = 12345L;
public void testDuplicateDocIDs() throws Exception {
long numactions = NUM_ACTIONS;
final BulkTransportClient client = Clients.builder()
.setMetric(new SimpleBulkMetric())
.setControl(new SimpleBulkControl())
try {
for (int i = 0; i < NUM_ACTIONS; i++) {
client.index("test", "test", randomString(1), "{ \"name\" : \"" + randomString(32) + "\"}");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.client(), SearchAction.INSTANCE)
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();"hits = {}", hits);
assertTrue(hits < NUM_ACTIONS);
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(numactions, client.getMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

