first version of elx for Elasticsearch 6.3.2

This commit is contained in:
Jörg Prante 2019-02-22 11:15:22 +01:00
parent 0ba6b6d99d
commit fa84c61336
226 changed files with 6608 additions and 17962 deletions

language: java
sudo: required
- oraclejdk8
- openjdk11
- $HOME/.m2

dependencies {
compile "io.netty:netty-buffer:${'netty.version')}"
compile "io.netty:netty-codec-http:${'netty.version')}"
compile "io.netty:netty-handler:${'netty.version')}"
compile "org.xbib.elasticsearch:elasticsearch:${'elasticsearch-server.version')}"
jar {
baseName "${}-api"

File diff suppressed because it is too large Load diff

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
buildscript {
repositories {
maven {
url ''
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:"
plugins {
id "org.sonarqube" version "2.6.1"
id "" version "0.11.0"
id "com.github.spotbugs" version "1.6.9"
id "org.xbib.gradle.plugin.asciidoctor" version ""
printf "Date: %s\nHost: %s\nOS: %s %s %s\nJava: %s %s %s %s\nGradle: %s Groovy: %s Java: %s\n" +
printf "Host: %s\nOS: %s %s %s\nJava: %s %s %s %s\nGradle: %s Groovy: %s Java: %s\n" +
"Build: group: ${} name: ${} version: ${project.version}\n",,
@ -33,31 +19,28 @@ printf "Date: %s\nHost: %s\nOS: %s %s %s\nJava: %s %s %s %s\nGradle: %s Groovy:
gradle.gradleVersion, GroovySystem.getVersion(), JavaVersion.current()
apply plugin: ""
apply plugin: 'org.xbib.gradle.plugin.asciidoctor'
ext {
user = 'jprante'
name = 'elx'
description = 'Elasticsearch extensions'
scmUrl = '' + user + '/' + name
scmConnection = 'scm:git:git://' + user + '/' + name + '.git'
scmDeveloperConnection = 'scm:git:git://' + user + '/' + name + '.git'
if (JavaVersion.current() < JavaVersion.VERSION_11) {
throw new GradleException("This build must be run with java 11 or higher")
subprojects {
apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'signing'
apply plugin: 'com.github.spotbugs'
apply plugin: 'pmd'
apply plugin: 'checkstyle'
apply plugin: 'org.xbib.gradle.plugin.asciidoctor'
configurations {
dependencies {
alpnagent "org.mortbay.jetty.alpn:jetty-alpn-agent:${'alpnagent.version')}"
testCompile "junit:junit:${'junit.version')}"
testCompile "org.apache.logging.log4j:log4j-core:${'log4j.version')}"
testCompile "org.apache.logging.log4j:log4j-slf4j-impl:${'log4j.version')}"
asciidoclet "org.xbib:asciidoclet:${'asciidoclet.version')}"
wagon "org.apache.maven.wagon:wagon-ssh:${'wagon.version')}"
@ -71,10 +54,32 @@ subprojects {
targetCompatibility = JavaVersion.VERSION_11
jar {
baseName "${}-${}"
tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:all"
if (!options.compilerArgs.contains("-processor")) {
options.compilerArgs << '-proc:none'
test {
jvmArgs =[
systemProperty 'jna.debug_load', 'true'
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
clean {
delete "data"
delete "logs"
delete "out"
/*javadoc {
options.docletpath = configurations.asciidoclet.files.asType(List)
options.doclet = 'org.xbib.asciidoclet.Asciidoclet'
@ -105,16 +110,76 @@ subprojects {
archives javadocJar, sourcesJar
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier 'javadoc'
task sourcesJar(type: Jar, dependsOn: classes) {
from sourceSets.main.allSource
classifier 'sources'
artifacts {
archives javadocJar, sourcesJar
if (project.hasProperty('signing.keyId')) {
signing {
sign configurations.archives
apply from: "${rootProject.projectDir}/gradle/ext.gradle"
apply from: "${rootProject.projectDir}/gradle/publish.gradle"
//apply from: "${rootProject.projectDir}/gradle/sonarqube.gradle"
spotbugs {
effort = "max"
reportLevel = "low"
//includeFilter = file("findbugs-exclude.xml")
tasks.withType(com.github.spotbugs.SpotBugsTask) {
ignoreFailures = true
reports {
xml.enabled = false
html.enabled = true
tasks.withType(Pmd) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
tasks.withType(Checkstyle) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
pmd {
toolVersion = '6.11.0'
ruleSets = ['category/java/bestpractices.xml']
checkstyle {
configFile = rootProject.file('config/checkstyle/checkstyle.xml')
ignoreFailures = true
showViolations = true
sonarqube {
properties {
property "sonar.projectName", "${} ${}"
property "sonar.sourceEncoding", "UTF-8"
property "sonar.tests", "src/test/java"
property "sonar.scm.provider", "git"
property "sonar.junit.reportsPath", "build/test-results/test/"
/*asciidoctor {

buildscript {
repositories {
maven {
url ''
dependencies {
classpath "org.xbib.elasticsearch:gradle-plugin-elasticsearch-build:"
apply plugin: ''
configurations {
dependencies {
compile project(':api')
compile "org.xbib:metrics:${'xbib-metrics.version')}"
compileOnly "org.apache.logging.log4j:log4j-api:${'log4j.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
testRuntime "org.xbib.elasticsearch:elasticsearch-test-framework:${'elasticsearch-devkit.version')}"
jar {
baseName "${}-common"
task testJar(type: Jar, dependsOn: testClasses) {
baseName = "${project.archivesBaseName}-tests"
from sourceSets.test.output
artifacts {
main jar
tests testJar
archives sourcesJar, javadocJar
test {
enabled = false
//jvmArgs "-javaagent:" + configurations.alpnagent.asPath
systemProperty 'path.home', project.buildDir.absolutePath
testLogging {
showStandardStreams = true
exceptionFormat = 'full'
randomizedTest {
enabled = false
esTest {
// test with the jars, not the classes, for security manager
// classpath = files(configurations.testRuntime) + configurations.main.artifacts.files + configurations.tests.artifacts.files
systemProperty '', 'true'
esTest.dependsOn jar, testJar
dependencyLicenses.enabled = false
// we not like to examine Netty
thirdPartyAudit.enabled = false

package org.xbib.elasticsearch.client;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public abstract class AbstractClient implements ClientMethods {
private static final Logger logger = LogManager.getLogger(AbstractClient.class.getName());
private Settings.Builder settingsBuilder;
private Settings settings;
private Map<String, String> mappings;
private ElasticsearchClient client;
protected BulkProcessor bulkProcessor;
protected BulkMetric metric;
protected BulkControl control;
protected Throwable throwable;
protected boolean closed;
protected int maxActionsPerRequest = DEFAULT_MAX_ACTIONS_PER_REQUEST;
protected int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
protected String maxVolumePerRequest = DEFAULT_MAX_VOLUME_PER_REQUEST;
protected String flushIngestInterval = DEFAULT_FLUSH_INTERVAL;
public AbstractClient init(ElasticsearchClient client, Settings settings,
final BulkMetric metric, final BulkControl control) {
this.client = client;
this.mappings = new HashMap<>();
if (settings == null) {
settings = findSettings();
if (client == null && settings != null) {
try {
this.client = createClient(settings);
} catch (IOException e) {
logger.error(e.getMessage(), e);
this.metric = metric;
this.control = control;
if (metric != null) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
private final Logger logger = LogManager.getLogger(getClass().getName() + ".Listener");
public void beforeBulk(long executionId, BulkRequest request) {
long l = -1;
if (metric != null) {
l = metric.getCurrentIngest().getCount();
int n = request.numberOfActions();
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = -1;
if (metric != null) {
l = metric.getCurrentIngest().getCount();
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
if (metric != null) {
metric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
if (itemResponse.isFailed()) {
if (metric != null) {
if (metric != null) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
if (n > 0) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
} else {
if (metric != null) {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (metric != null) {
throwable = failure;
closed = true;
logger.error("after bulk [" + executionId + "] error", failure);
if (this.client != null) {
BulkProcessor.Builder builder = BulkProcessor.builder(this.client, listener)
.setFlushInterval(TimeValue.parseTimeValue(flushIngestInterval, "flushIngestInterval"));
if (maxVolumePerRequest != null) {
builder.setBulkSize(ByteSizeValue.parseBytesSizeValue(maxVolumePerRequest, "maxVolumePerRequest"));
this.bulkProcessor =;
this.closed = false;
return this;
protected abstract ElasticsearchClient createClient(Settings settings) throws IOException;
public ElasticsearchClient client() {
return client;
public ClientMethods maxActionsPerRequest(int maxActionsPerRequest) {
this.maxActionsPerRequest = maxActionsPerRequest;
return this;
public ClientMethods maxConcurrentRequests(int maxConcurrentRequests) {
this.maxConcurrentRequests = maxConcurrentRequests;
return this;
public ClientMethods maxVolumePerRequest(String maxVolumePerRequest) {
this.maxVolumePerRequest = maxVolumePerRequest;
return this;
public ClientMethods flushIngestInterval(String flushIngestInterval) {
this.flushIngestInterval = flushIngestInterval;
return this;
public BulkMetric getMetric() {
return metric;
public void resetSettings() {
this.settingsBuilder = Settings.builder();
settings = null;
mappings = new HashMap<>();
public void setSettings(Settings settings) {
this.settings = settings;
public void setting(String key, String value) {
if (settingsBuilder == null) {
settingsBuilder = Settings.builder();
settingsBuilder.put(key, value);
public void setting(String key, Boolean value) {
if (settingsBuilder == null) {
settingsBuilder = Settings.builder();
settingsBuilder.put(key, value);
public void setting(String key, Integer value) {
if (settingsBuilder == null) {
settingsBuilder = Settings.builder();
settingsBuilder.put(key, value);
public void setting(InputStream in) throws IOException {
settingsBuilder = Settings.builder().loadFromStream(".json", in, true);
public Settings.Builder settingsBuilder() {
return settingsBuilder != null ? settingsBuilder : Settings.builder();
public Settings settings() {
if (settings != null) {
return settings;
if (settingsBuilder == null) {
settingsBuilder = Settings.builder();
public void mapping(String type, String mapping) throws IOException {
mappings.put(type, mapping);
public void mapping(String type, InputStream in) throws IOException {
if (type == null) {
StringWriter sw = new StringWriter();
Streams.copy(new InputStreamReader(in, StandardCharsets.UTF_8), sw);
mappings.put(type, sw.toString());
public ClientMethods index(String index, String type, String id, boolean create, BytesReference source) {
return indexRequest(new IndexRequest(index).type(type).id(id).create(create).source(source, XContentType.JSON));
public ClientMethods index(String index, String type, String id, boolean create, String source) {
return indexRequest(new IndexRequest(index).type(type).id(id).create(create).source(source, XContentType.JSON));
public ClientMethods indexRequest(IndexRequest indexRequest) {
if (closed) {
try {
if (metric != null) {
metric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(),;
} catch (Exception e) {
throwable = e;
closed = true;
logger.error("bulk add of index request failed: " + e.getMessage(), e);
return this;
public ClientMethods delete(String index, String type, String id) {
return deleteRequest(new DeleteRequest(index).type(type).id(id));
public ClientMethods deleteRequest(DeleteRequest deleteRequest) {
if (closed) {
try {
if (metric != null) {
metric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(),;
} catch (Exception e) {
throwable = e;
closed = true;
logger.error("bulk add of delete failed: " + e.getMessage(), e);
return this;
public ClientMethods update(String index, String type, String id, BytesReference source) {
return updateRequest(new UpdateRequest().index(index).type(type).id(id).upsert(source, XContentType.JSON));
public ClientMethods update(String index, String type, String id, String source) {
return updateRequest(new UpdateRequest().index(index).type(type).id(id).upsert(source, XContentType.JSON));
public ClientMethods updateRequest(UpdateRequest updateRequest) {
if (closed) {
try {
if (metric != null) {
metric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(),;
} catch (Exception e) {
throwable = e;
closed = true;
logger.error("bulk add of update request failed: " + e.getMessage(), e);
return this;
public ClientMethods startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds)
throws IOException {
if (control == null) {
return this;
if (!control.isBulk(index) && startRefreshIntervalSeconds > 0L && stopRefreshIntervalSeconds > 0L) {
control.startBulk(index, startRefreshIntervalSeconds, stopRefreshIntervalSeconds);
updateIndexSetting(index, "refresh_interval", startRefreshIntervalSeconds + "s");
return this;
public ClientMethods stopBulk(String index) throws IOException {
if (control == null) {
return this;
if (control.isBulk(index)) {
long secs = control.getStopBulkRefreshIntervals().get(index);
if (secs > 0L) {
updateIndexSetting(index, "refresh_interval", secs + "s");
return this;
public ClientMethods flushIngest() {
if (closed) {
logger.debug("flushing bulk processor");
return this;
public synchronized void shutdown() throws IOException {
if (closed) {
if (bulkProcessor != null) {"closing bulk processor...");
if (metric != null) {"stopping metric");
if (control != null && control.indices() != null && !control.indices().isEmpty()) {"stopping bulk mode for indices {}...", control.indices());
for (String index : control.indices()) {
public ClientMethods newIndex(String index) {
if (closed) {
return newIndex(index, null, null);
public ClientMethods newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException {
mapping(type, mappings);
return newIndex(index, settings(), this.mappings);
public ClientMethods newIndex(String index, Settings settings, Map<String, String> mappings) {
if (closed) {
if (client() == null) {
logger.warn("no client for create index");
return this;
if (index == null) {
logger.warn("no index name given to create index");
return this;
CreateIndexRequestBuilder createIndexRequestBuilder =
new CreateIndexRequestBuilder(client(), CreateIndexAction.INSTANCE).setIndex(index);
if (settings != null) {"found settings {}", settings.toString());
if (mappings != null) {
for (Map.Entry<String, String> entry : mappings.entrySet()) {
String type = entry.getKey();
String mapping = entry.getValue();"found mapping for {}", type);
createIndexRequestBuilder.addMapping(type, mapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();"index {} created: {}", index, createIndexResponse);
return this;
public ClientMethods newMapping(String index, String type, Map<String, Object> mapping) {
PutMappingRequestBuilder putMappingRequestBuilder =
new PutMappingRequestBuilder(client(), PutMappingAction.INSTANCE)
putMappingRequestBuilder.execute().actionGet();"mapping created for index {} and type {}", index, type);
return this;
public ClientMethods deleteIndex(String index) {
if (closed) {
if (client == null) {
logger.warn("no client");
return this;
if (index == null) {
logger.warn("no index name given to delete index");
return this;
DeleteIndexRequestBuilder deleteIndexRequestBuilder =
new DeleteIndexRequestBuilder(client(), DeleteIndexAction.INSTANCE, index);
return this;
public ClientMethods waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException {
if (closed) {
long millis = TimeValue.parseTimeValue(maxWaitTime, "millis").getMillis();
while (!bulkProcessor.awaitClose(millis, TimeUnit.MILLISECONDS)) {
logger.warn("still waiting for responses");
return this;
public void waitForRecovery() throws IOException {
if (client() == null) {
client().execute(RecoveryAction.INSTANCE, new RecoveryRequest()).actionGet();
public int waitForRecovery(String index) throws IOException {
if (client() == null) {
return -1;
if (index == null) {
throw new IOException("unable to waitfor recovery, index not set");
RecoveryResponse response = client().execute(RecoveryAction.INSTANCE, new RecoveryRequest(index)).actionGet();
int shards = response.getTotalShards();
client().execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest(index)
return shards;
public void waitForCluster(String statusString, String timeout) throws IOException {
if (client() == null) {
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusString);
ClusterHealthResponse healthResponse =
client().execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest()
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ " and not " +
+ ", from here on, everything will fail!");
public String fetchClusterName() {
if (client() == null) {
return null;
try {
ClusterStateRequestBuilder clusterStateRequestBuilder =
new ClusterStateRequestBuilder(client(), ClusterStateAction.INSTANCE).all();
ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
String name = clusterStateResponse.getClusterName().value();
int nodeCount = clusterStateResponse.getState().getNodes().getSize();
return name + " (" + nodeCount + " nodes connected)";
} catch (ElasticsearchTimeoutException e) {
logger.warn(e.getMessage(), e);
return "TIMEOUT";
} catch (NoNodeAvailableException e) {
logger.warn(e.getMessage(), e);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return "[" + e.getMessage() + "]";
public String healthColor() {
if (client() == null) {
return null;
try {
ClusterHealthResponse healthResponse =
new ClusterHealthRequest().timeout(TimeValue.timeValueSeconds(30))).actionGet();
ClusterHealthStatus status = healthResponse.getStatus();
} catch (ElasticsearchTimeoutException e) {
logger.warn(e.getMessage(), e);
return "TIMEOUT";
} catch (NoNodeAvailableException e) {
logger.warn(e.getMessage(), e);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return "[" + e.getMessage() + "]";
public int updateReplicaLevel(String index, int level) throws IOException {
updateIndexSetting(index, "number_of_replicas", level);
return waitForRecovery(index);
public void flushIndex(String index) {
if (client() == null) {
if (index != null) {
client().execute(FlushAction.INSTANCE, new FlushRequest(index)).actionGet();
public void refreshIndex(String index) {
if (client() == null) {
if (index != null) {
client().execute(RefreshAction.INSTANCE, new RefreshRequest(index)).actionGet();
public void putMapping(String index) {
if (client() == null) {
if (!mappings.isEmpty()) {
for (Map.Entry<String, String> me : mappings.entrySet()) {
new PutMappingRequest(index).type(me.getKey()).source(me.getValue(), XContentType.JSON)).actionGet();
public String resolveAlias(String alias) {
if (client() == null) {
return alias;
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client(), GetAliasesAction.INSTANCE);
GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet();
if (!getAliasesResponse.getAliases().isEmpty()) {
return getAliasesResponse.getAliases().keys().iterator().next().value;
return alias;
public String resolveMostRecentIndex(String alias) {
if (client() == null) {
return alias;
if (alias == null) {
return null;
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client(), GetAliasesAction.INSTANCE);
GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
Set<String> indices = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
Matcher m = pattern.matcher(indexName.value);
if (m.matches() && alias.equals( {
return indices.isEmpty() ? alias : indices.iterator().next();
public Map<String, String> getAliasFilters(String alias) {
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client(), GetAliasesAction.INSTANCE);
return getFilters(getAliasesRequestBuilder.setIndices(resolveAlias(alias)).execute().actionGet());
public Map<String, String> getIndexFilters(String index) {
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client(), GetAliasesAction.INSTANCE);
return getFilters(getAliasesRequestBuilder.setIndices(index).execute().actionGet());
public void switchAliases(String index, String concreteIndex, List<String> extraAliases) {
switchAliases(index, concreteIndex, extraAliases, null);
public void switchAliases(String index, String concreteIndex,
List<String> extraAliases, IndexAliasAdder adder) {
if (client() == null) {
if (index.equals(concreteIndex)) {
// two situations: 1. there is a new alias 2. there is already an old index with the alias
String oldIndex = resolveAlias(index);
final Map<String, String> oldFilterMap = oldIndex.equals(index) ? null : getIndexFilters(oldIndex);
final List<String> newAliases = new LinkedList<>();
final List<String> switchAliases = new LinkedList<>();
IndicesAliasesRequestBuilder requestBuilder = new IndicesAliasesRequestBuilder(client(), IndicesAliasesAction.INSTANCE);
if (oldFilterMap == null || !oldFilterMap.containsKey(index)) {
// never apply a filter for trunk index name
requestBuilder.addAlias(concreteIndex, index);
// switch existing aliases
if (oldFilterMap != null) {
for (Map.Entry<String, String> entry : oldFilterMap.entrySet()) {
String alias = entry.getKey();
String filter = entry.getValue();
requestBuilder.removeAlias(oldIndex, alias);
if (filter != null) {
requestBuilder.addAlias(concreteIndex, alias, filter);
} else {
requestBuilder.addAlias(concreteIndex, alias);
// a list of aliases that should be added, check if new or old
if (extraAliases != null) {
for (String extraAlias : extraAliases) {
if (oldFilterMap == null || !oldFilterMap.containsKey(extraAlias)) {
// index alias adder only active on extra aliases, and if alias is new
if (adder != null) {
adder.addIndexAlias(requestBuilder, concreteIndex, extraAlias);
} else {
requestBuilder.addAlias(concreteIndex, extraAlias);
} else {
String filter = oldFilterMap.get(extraAlias);
requestBuilder.removeAlias(oldIndex, extraAlias);
if (filter != null) {
requestBuilder.addAlias(concreteIndex, extraAlias, filter);
} else {
requestBuilder.addAlias(concreteIndex, extraAlias);
if (!newAliases.isEmpty() || !switchAliases.isEmpty()) {"new aliases = {}, switch aliases = {}", newAliases, switchAliases);
public void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep) {
if (client() == null) {
if (index.equals(concreteIndex)) {
GetIndexRequestBuilder getIndexRequestBuilder = new GetIndexRequestBuilder(client(), GetIndexAction.INSTANCE);
GetIndexResponse getIndexResponse = getIndexRequestBuilder.execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
Set<String> indices = new TreeSet<>();"{} indices", getIndexResponse.getIndices().length);
for (String s : getIndexResponse.getIndices()) {
Matcher m = pattern.matcher(s);
if (m.matches() && index.equals( && !s.equals(concreteIndex)) {
if (indices.isEmpty()) {"no indices found, retention policy skipped");
if (mintokeep > 0 && indices.size() <= mintokeep) {"{} indices found, not enough for retention policy ({}), skipped",
indices.size(), mintokeep);
} else {"candidates for deletion = {}", indices);
List<String> indicesToDelete = new ArrayList<>();
// our index
Matcher m1 = pattern.matcher(concreteIndex);
if (m1.matches()) {
Integer i1 = Integer.parseInt(;
for (String s : indices) {
Matcher m2 = pattern.matcher(s);
if (m2.matches()) {
Integer i2 = Integer.parseInt(;
int kept = indices.size() - indicesToDelete.size();
if ((timestampdiff == 0 || (timestampdiff > 0 && i1 - i2 > timestampdiff)) && mintokeep <= kept) {
}"indices to delete = {}", indicesToDelete);
if (indicesToDelete.isEmpty()) {"not enough indices found to delete, retention policy complete");
String[] s = indicesToDelete.toArray(new String[indicesToDelete.size()]);
DeleteIndexRequestBuilder requestBuilder = new DeleteIndexRequestBuilder(client(), DeleteIndexAction.INSTANCE, s);
DeleteIndexResponse response = requestBuilder.execute().actionGet();
if (!response.isAcknowledged()) {
logger.warn("retention delete index operation was not acknowledged");
public Long mostRecentDocument(String index, String timestampfieldname) {
if (client() == null) {
return null;
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client(), SearchAction.INSTANCE);
SortBuilder<?> sort = SortBuilders.fieldSort(timestampfieldname).order(SortOrder.DESC);
SearchResponse searchResponse = searchRequestBuilder.setIndices(index)
if (searchResponse.getHits().getHits().length == 1) {
SearchHit hit = searchResponse.getHits().getHits()[0];
if (hit.getFields().get(timestampfieldname) != null) {
return hit.getFields().get(timestampfieldname).getValue();
} else {
return 0L;
return null;
public boolean hasThrowable() {
return throwable != null;
public Throwable getThrowable() {
return throwable;
protected static void throwClose() {
throw new ElasticsearchException("client is closed");
protected void updateIndexSetting(String index, String key, Object value) throws IOException {
if (client() == null) {
if (index == null) {
throw new IOException("no index name given");
if (key == null) {
throw new IOException("no key given");
if (value == null) {
throw new IOException("no value given");
Settings.Builder updateSettingsBuilder = Settings.builder();
updateSettingsBuilder.put(key, value.toString());
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index)
client().execute(UpdateSettingsAction.INSTANCE, updateSettingsRequest).actionGet();
private Map<String, String> getFilters(GetAliasesResponse getAliasesResponse) {
Map<String, String> result = new HashMap<>();
for (ObjectObjectCursor<String, List<AliasMetaData>> object : getAliasesResponse.getAliases()) {
List<AliasMetaData> aliasMetaDataList = object.value;
for (AliasMetaData aliasMetaData : aliasMetaDataList) {
if (aliasMetaData.filteringRequired()) {
String metaData = new String(aliasMetaData.getFilter().uncompressed(), StandardCharsets.UTF_8);
result.put(aliasMetaData.alias(), metaData);
} else {
result.put(aliasMetaData.alias(), null);
return result;
private Settings findSettings() {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put("host", "localhost");
try {
String hostname = NetworkUtils.getLocalAddress().getHostName();
logger.debug("the hostname is {}", hostname);
settingsBuilder.put("host", hostname)
.put("port", 9300);
} catch (Exception e) {
logger.warn(e.getMessage(), e);

package org.xbib.elasticsearch.client;
import java.util.Map;
import java.util.Set;
public interface BulkControl {
void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval);
boolean isBulk(String indexName);
void finishBulk(String indexName);
Set<String> indices();
Map<String, Long> getStartBulkRefreshIntervals();
Map<String, Long> getStopBulkRefreshIntervals();

package org.xbib.elasticsearch.client;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
public final class ClientBuilder implements Parameters {
private final Settings.Builder settingsBuilder;
private Map<Class<? extends ClientMethods>, ClientMethods> clientMethodsMap;
private BulkMetric metric;
private BulkControl control;
public ClientBuilder() {
public ClientBuilder(ClassLoader classLoader) {
this.settingsBuilder = Settings.builder();
//settingsBuilder.put("", "clientnode");
this.clientMethodsMap = new HashMap<>();
ServiceLoader<ClientMethods> serviceLoader = ServiceLoader.load(ClientMethods.class,
classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader());
for (ClientMethods clientMethods : serviceLoader) {
clientMethodsMap.put(clientMethods.getClass(), clientMethods);
public static ClientBuilder builder() {
return new ClientBuilder();
public ClientBuilder put(String key, String value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Integer value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Long value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Double value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, ByteSizeValue value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, TimeValue value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(Settings settings) {
return this;
public ClientBuilder setMetric(BulkMetric metric) {
this.metric = metric;
return this;
public ClientBuilder setControl(BulkControl control) {
this.control = control;
return this;
public <C extends ClientMethods> C getClient(Class<C> clientClass) {
return getClient(null, clientClass);
public <C extends ClientMethods> C getClient(Client client, Class<C> clientClass) {
Settings settings =;
return (C) clientMethodsMap.get(clientClass)
.flushIngestInterval(settings.get(FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL))
.init(client, settings, metric, control);

package org.xbib.elasticsearch.client;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
* Interface for providing convenient administrative methods for ingesting data into Elasticsearch.
public interface ClientMethods extends Parameters {
ClientMethods init(ElasticsearchClient client, Settings settings, BulkMetric metric, BulkControl control);
* Return Elasticsearch client.
* @return Elasticsearch client
ElasticsearchClient client();
* Bulked index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param index the index
* @param type the type
* @param id the id
* @param create true if document must be created
* @param source the source
* @return this
ClientMethods index(String index, String type, String id, boolean create, BytesReference source);
* Bulked index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param index the index
* @param type the type
* @param id the id
* @param create true if document must be created
* @param source the source
* @return this
ClientMethods index(String index, String type, String id, boolean create, String source);
* Bulked index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param indexRequest the index request to add
* @return this ingest
ClientMethods indexRequest(IndexRequest indexRequest);
* Delete document.
* @param index the index
* @param type the type
* @param id the id
* @return this ingest
ClientMethods delete(String index, String type, String id);
* Bulked delete request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param deleteRequest the delete request to add
* @return this ingest
ClientMethods deleteRequest(DeleteRequest deleteRequest);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized.
* @param index the index
* @param type the type
* @param id the id
* @param source the source
* @return this
ClientMethods update(String index, String type, String id, BytesReference source);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized.
* @param index the index
* @param type the type
* @param id the id
* @param source the source
* @return this
ClientMethods update(String index, String type, String id, String source);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized.
* @param updateRequest the update request to add
* @return this ingest
ClientMethods updateRequest(UpdateRequest updateRequest);
* Set the maximum number of actions per request.
* @param maxActionsPerRequest maximum number of actions per request
* @return this ingest
ClientMethods maxActionsPerRequest(int maxActionsPerRequest);
* Set the maximum concurent requests.
* @param maxConcurentRequests maximum number of concurrent ingest requests
* @return this Ingest
ClientMethods maxConcurrentRequests(int maxConcurentRequests);
* Set the maximum volume for request before flush.
* @param maxVolume maximum volume
* @return this ingest
ClientMethods maxVolumePerRequest(String maxVolume);
* Set the flush interval for automatic flushing outstanding ingest requests.
* @param flushInterval the flush interval, default is 30 seconds
* @return this ingest
ClientMethods flushIngestInterval(String flushInterval);
* Set mapping.
* @param type mapping type
* @param in mapping definition as input stream
* @throws IOException if mapping could not be added
void mapping(String type, InputStream in) throws IOException;
* Set mapping.
* @param type mapping type
* @param mapping mapping definition as input stream
* @throws IOException if mapping could not be added
void mapping(String type, String mapping) throws IOException;
* Put mapping.
* @param index index
void putMapping(String index);
* Create a new index.
* @param index index
* @return this ingest
ClientMethods newIndex(String index);
* Create a new index.
* @param index index
* @param type type
* @param settings settings
* @param mappings mappings
* @return this ingest
* @throws IOException if new index creation fails
ClientMethods newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException;
* Create a new index.
* @param index index
* @param settings settings
* @param mappings mappings
* @return this ingest
ClientMethods newIndex(String index, Settings settings, Map<String, String> mappings);
* Create new mapping.
* @param index index
* @param type index type
* @param mapping mapping
* @return this ingest
ClientMethods newMapping(String index, String type, Map<String, Object> mapping);
* Delete index.
* @param index index
* @return this ingest
ClientMethods deleteIndex(String index);
* Start bulk mode.
* @param index index
* @param startRefreshIntervalSeconds refresh interval before bulk
* @param stopRefreshIntervalSeconds refresh interval after bulk
* @return this ingest
* @throws IOException if bulk could not be started
ClientMethods startBulk(String index, long startRefreshIntervalSeconds, long stopRefreshIntervalSeconds) throws IOException;
* Stops bulk mode.
* @param index index
* @return this Ingest
* @throws IOException if bulk could not be stopped
ClientMethods stopBulk(String index) throws IOException;
* Flush ingest, move all pending documents to the cluster.
* @return this
ClientMethods flushIngest();
* Wait for all outstanding responses.
* @param maxWaitTime maximum wait time
* @return this ingest
* @throws InterruptedException if wait is interrupted
* @throws ExecutionException if execution failed
ClientMethods waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException;
* Refresh the index.
* @param index index
void refreshIndex(String index);
* Flush the index.
* @param index index
void flushIndex(String index);
* Update replica level.
* @param index index
* @param level the replica level
* @return number of shards after updating replica level
* @throws IOException if replica could not be updated
int updateReplicaLevel(String index, int level) throws IOException;
* Wait for cluster being healthy.
* @param healthColor cluster health color to wait for
* @param timeValue time value
* @throws IOException if wait failed
void waitForCluster(String healthColor, String timeValue) throws IOException;
* Get current health color.
* @return the cluster health color
String healthColor();
* Wait for index recovery (after replica change).
* @param index index
* @return number of shards found
* @throws IOException if wait failed
int waitForRecovery(String index) throws IOException;
* Resolve alias.
* @param alias the alias
* @return one index name behind the alias or the alias if there is no index
String resolveAlias(String alias);
* Resolve alias to all connected indices, sort index names with most recent timestamp on top, return this index
* name.
* @param alias the alias
* @return the most recent index name pointing to the alias
String resolveMostRecentIndex(String alias);
* Get all alias filters.
* @param index index
* @return map of alias filters
Map<String, String> getAliasFilters(String index);
* Switch aliases from one index to another.
* @param index the index name
* @param concreteIndex the index name with timestamp
* @param extraAliases a list of names that should be set as index aliases
void switchAliases(String index, String concreteIndex, List<String> extraAliases);
* Switch aliases from one index to another.
* @param index the index name
* @param concreteIndex the index name with timestamp
* @param extraAliases a list of names that should be set as index aliases
* @param adder an adder method to create alias term queries
void switchAliases(String index, String concreteIndex, List<String> extraAliases, IndexAliasAdder adder);
* Retention policy for an index. All indices before timestampdiff should be deleted,
* but mintokeep indices must be kept.
* @param index index name
* @param concreteIndex index name with timestamp
* @param timestampdiff timestamp delta (for index timestamps)
* @param mintokeep minimum number of indices to keep
void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep);
* Find the timestamp of the most recently indexed document in the index.
* @param index the index name
* @param timestampfieldname the timestamp field name
* @return millis UTC millis of the most recent document
* @throws IOException if most rcent document can not be found
Long mostRecentDocument(String index, String timestampfieldname) throws IOException;
* Get metric.
* @return metric
BulkMetric getMetric();
* Returns true is a throwable exists.
* @return true if a Throwable exists
boolean hasThrowable();
* Return last throwable if exists.
* @return last throwable
Throwable getThrowable();
* Shutdown the ingesting.
* @throws IOException is shutdown fails
void shutdown() throws IOException;

package org.xbib.elasticsearch.client;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
public interface IndexAliasAdder {
void addIndexAlias(IndicesAliasesRequestBuilder builder, String index, String alias);

@ -1,20 +0,0 @@
package org.xbib.elasticsearch.client;
public interface Parameters {
int DEFAULT_MAX_CONCURRENT_REQUESTS = Runtime.getRuntime().availableProcessors();
String MAX_ACTIONS_PER_REQUEST = "max_actions_per_request";
String MAX_CONCURRENT_REQUESTS = "max_concurrent_requests";
String MAX_VOLUME_PER_REQUEST = "max_volume_per_request";
String FLUSH_INTERVAL = "flush_interval";

package org.xbib.elasticsearch.client;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class SimpleBulkControl implements BulkControl {
private final Set<String> indexNames = new HashSet<>();
private final Map<String, Long> startBulkRefreshIntervals = new HashMap<>();
private final Map<String, Long> stopBulkRefreshIntervals = new HashMap<>();
public void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval) {
synchronized (indexNames) {
startBulkRefreshIntervals.put(indexName, startRefreshInterval);
stopBulkRefreshIntervals.put(indexName, stopRefreshInterval);
public boolean isBulk(String indexName) {
return indexNames.contains(indexName);
public void finishBulk(String indexName) {
synchronized (indexNames) {
public Set<String> indices() {
return indexNames;
public Map<String, Long> getStartBulkRefreshIntervals() {
return startBulkRefreshIntervals;
public Map<String, Long> getStopBulkRefreshIntervals() {
return stopBulkRefreshIntervals;

* Classes for Elasticsearch client.
package org.xbib.elasticsearch.client;

package org.xbib.elasticsearch.client.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.testframework.ESSingleNodeTestCase;
public class SearchTests extends ESSingleNodeTestCase {
private static final Logger logger = LogManager.getLogger(SearchTests.class.getName());
public void testSearch() throws Exception {
long t0 = System.currentTimeMillis();
BulkRequestBuilder builder = new BulkRequestBuilder(client(), BulkAction.INSTANCE);
for (int i = 0; i < 1000; i++) {
.field("user1", "kimchy")
.field("user2", "kimchy")
.field("user3", "kimchy")
.field("user4", "kimchy")
.field("user5", "kimchy")
.field("user6", "kimchy")
.field("user7", "kimchy")
.field("user8", "kimchy")
.field("user9", "kimchy")
.field("rowcount", i)
.field("rs", 1234)
long t1 = System.currentTimeMillis();"t1-t0 = {}", t1 - t0);
for (int i = 0; i < 100; i++) {
t1 = System.currentTimeMillis();
QueryBuilder queryStringBuilder =
QueryBuilders.queryStringQuery("rs:" + 1234);
SearchRequestBuilder requestBuilder = client().prepareSearch()
.addSort("rowcount", SortOrder.DESC)
.setFrom(i * 10).setSize(10);
SearchResponse response = requestBuilder.execute().actionGet();
long t2 = System.currentTimeMillis();"t2-t1 = {}", t2 - t1);

package org.xbib.elasticsearch.client.common;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.testframework.ESSingleNodeTestCase;
public class WildcardTests extends ESSingleNodeTestCase {
public void testWildcard() throws Exception {
index("1", "010");
index("2", "0*0");
// exact
validateCount(QueryBuilders.queryStringQuery("010").defaultField("field"), 1);
validateCount(QueryBuilders.queryStringQuery("0\\*0").defaultField("field"), 1);
// pattern
validateCount(QueryBuilders.queryStringQuery("0*0").defaultField("field"), 1); // 2?
validateCount(QueryBuilders.queryStringQuery("0?0").defaultField("field"), 1); // 2?
validateCount(QueryBuilders.queryStringQuery("0**0").defaultField("field"), 1); // 2?
validateCount(QueryBuilders.queryStringQuery("0??0").defaultField("field"), 0);
validateCount(QueryBuilders.queryStringQuery("*10").defaultField("field"), 1);
validateCount(QueryBuilders.queryStringQuery("*1*").defaultField("field"), 1);
validateCount(QueryBuilders.queryStringQuery("*\\*0").defaultField("field"), 0); // 1?
validateCount(QueryBuilders.queryStringQuery("*\\**").defaultField("field"), 0); // 1?
private void index(String id, String fieldValue) throws IOException {
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject())
private void validateCount(QueryBuilder queryBuilder, long expectedHits) {
final long actualHits = count(queryBuilder);
if (actualHits != expectedHits) {
throw new RuntimeException("actualHits=" + actualHits + ", expectedHits=" + expectedHits);
private long count(QueryBuilder queryBuilder) {
return client().prepareSearch("index").setTypes("type")

* Classes to test Elasticsearch clients.
package org.xbib.elasticsearch.client.common;

@ -0,0 +1,4 @@
dependencies {
compile "org.xbib:metrics:${'xbib-metrics.version')}"
compile "org.xbib.elasticsearch:elasticsearch:${'elasticsearch-server.version')}"

View file

package org.xbib.elx.api;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.TimeUnit;
public interface BulkController extends Closeable, Flushable {
void init(Settings settings);
Throwable getLastBulkError();
void startBulkMode(IndexDefinition indexDefinition) throws IOException;
void startBulkMode(String indexName, long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException;
void index(IndexRequest indexRequest);
void delete(DeleteRequest deleteRequest);
void update(UpdateRequest updateRequest);
boolean waitForResponses(long timeout, TimeUnit timeUnit);
void stopBulkMode(IndexDefinition indexDefinition) throws IOException;
void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException;

View file

@ -1,9 +1,14 @@
package org.xbib.elasticsearch.client;
package org.xbib.elx.api;
import org.elasticsearch.common.settings.Settings;
import org.xbib.metrics.Count;
import org.xbib.metrics.Metered;
public interface BulkMetric {
public interface BulkMetric extends Closeable {
void init(Settings settings);
Metered getTotalIngest();
@ -19,9 +24,9 @@ public interface BulkMetric {
Count getFailed();
long elapsed();
void start();
void stop();
long elapsed();

View file

package org.xbib.elx.api;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import java.util.concurrent.TimeUnit;
public interface BulkProcessor extends Closeable, Flushable {
BulkProcessor add(ActionRequest request);
BulkProcessor add(ActionRequest request, Object payload);
boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException;
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
interface BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
* A listener for the execution.
public interface Listener {
* Callback before the bulk is executed.
* @param executionId execution ID
* @param request request
void beforeBulk(long executionId, BulkRequest request);
* Callback after a successful execution of bulk request.
* @param executionId execution ID
* @param request request
* @param response response
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
* Callback after a failed execution of bulk request.
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
* @param executionId execution ID
* @param request request
* @param failure failure
void afterBulk(long executionId, BulkRequest request, Throwable failure);

View file

package org.xbib.elx.api;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
* Interface for extended managing and indexing methods of an Elasticsearch client.
public interface ExtendedClient extends Flushable, Closeable {
* Set an Elasticsearch client to extend from it. May be null for TransportClient.
* @param client client
* @return this client
ExtendedClient setClient(ElasticsearchClient client);
* Return Elasticsearch client.
* @return Elasticsearch client
ElasticsearchClient getClient();
* Get bulk metric.
* @return the bulk metric
BulkMetric getBulkMetric();
* Get buulk control.
* @return the bulk control
BulkController getBulkController();
* Initiative the extended client, the bulk metric and bulk controller,
* creates instances and connect to cluster, if required.
* @param settings settings
* @return this client
* @throws IOException if init fails
ExtendedClient init(Settings settings) throws IOException;
* Build index definition from settings.
* @param index the index name
* @param settings the settings for the index
* @return index definition
* @throws IOException if settings/mapping URL is invalid/malformed
IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException;
* Add index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded.
* @param index the index
* @param id the id
* @param create true if document must be created
* @param source the source
* @return this
ExtendedClient index(String index, String id, boolean create, BytesReference source);
* Index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded.
* @param index the index
* @param id the id
* @param create true if document is to be created, false otherwise
* @param source the source
* @return this client methods
ExtendedClient index(String index, String id, boolean create, String source);
* Index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param indexRequest the index request to add
* @return this
ExtendedClient index(IndexRequest indexRequest);
* Delete request.
* @param index the index
* @param id the id
* @return this
ExtendedClient delete(String index, String id);
* Delete request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* @param deleteRequest the delete request to add
* @return this
ExtendedClient delete(DeleteRequest deleteRequest);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized.
* @param index the index
* @param id the id
* @param source the source
* @return this
ExtendedClient update(String index, String id, BytesReference source);
* Update document. Use with precaution! Does not work in all cases.
* @param index the index
* @param id the id
* @param source the source
* @return this
ExtendedClient update(String index, String id, String source);
* Bulked update request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Note that updates only work correctly when all operations between nodes are synchronized.
* @param updateRequest the update request to add
* @return this
ExtendedClient update(UpdateRequest updateRequest);
* Create a new index.
* @param index index
* @return this
* @throws IOException if new index creation fails
ExtendedClient newIndex(String index) throws IOException;
* Create a new index.
* @param index index
* @param settings settings
* @param mapping mapping
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException;
* Create a new index.
* @param index index
* @param settings settings
* @return this
* @throws IOException if settings is invalid or index creation fails
ExtendedClient newIndex(String index, Settings settings) throws IOException;
* Create a new index.
* @param index index
* @param settings settings
* @param mapping mapping
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException;
* Create a new index.
* @param index index
* @param settings settings
* @param mapping mapping
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException;
* Create a new index.
* @param indexDefinition the index definition
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException;
* Delete an index.
* @param indexDefinition the index definition
* @return this
ExtendedClient deleteIndex(IndexDefinition indexDefinition);
* Delete an index.
* @param index index
* @return this
ExtendedClient deleteIndex(String index);
* Start bulk mode for indexes.
* @param indexDefinition index definition
* @return this
* @throws IOException if bulk could not be started
ExtendedClient startBulk(IndexDefinition indexDefinition) throws IOException;
* Start bulk mode.
* @param index index
* @param startRefreshIntervalSeconds refresh interval before bulk
* @param stopRefreshIntervalSeconds refresh interval after bulk
* @return this
* @throws IOException if bulk could not be started
ExtendedClient startBulk(String index, long startRefreshIntervalSeconds,
long stopRefreshIntervalSeconds) throws IOException;
* Stop bulk mode.
* @param indexDefinition index definition
* @return this
* @throws IOException if bulk could not be startet
ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException;
* Stops bulk mode.
* @param index index
* @param timeout maximum wait time
* @param timeUnit time unit for timeout
* @return this
* @throws IOException if bulk could not be stopped
ExtendedClient stopBulk(String index, long timeout, TimeUnit timeUnit) throws IOException;
* Update replica level.
* @param indexDefinition the index definition
* @param level the replica level
* @return this
* @throws IOException if replica setting could not be updated
ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException;
* Update replica level.
* @param index index
* @param level the replica level
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return this
* @throws IOException if replica setting could not be updated
ExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) throws IOException;
* Get replica level.
* @param indexDefinition the index name
* @return the replica level of the index
int getReplicaLevel(IndexDefinition indexDefinition);
* Get replica level.
* @param index the index name
* @return the replica level of the index
int getReplicaLevel(String index);
* Refresh the index.
* @param index index
* @return this
ExtendedClient refreshIndex(String index);
* Flush the index. The cluster clears cache and completes indexing.
* @param index index
* @return this
ExtendedClient flushIndex(String index);
* Force segment merge of an index.
* @param indexDefinition th eindex definition
* @return this
boolean forceMerge(IndexDefinition indexDefinition);
* Force segment merge of an index.
* @param index the index
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return this
boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit);
* Wait for all outstanding bulk responses.
* @param timeout maximum wait time
* @param timeUnit unit of timeout value
* @return true if wait succeeded, false if wait timed out
boolean waitForResponses(long timeout, TimeUnit timeUnit);
* Wait for cluster being healthy.
* @param healthColor cluster health color to wait for
* @param maxWaitTime time value
* @param timeUnit time unit
* @return true if wait succeeded, false if wait timed out
boolean waitForCluster(String healthColor, long maxWaitTime, TimeUnit timeUnit);
* Get current health color.
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return the cluster health color
String getHealthColor(long maxWaitTime, TimeUnit timeUnit);
* Wait for index recovery (after replica change).
* @param index index
* @param maxWaitTime maximum wait time
* @param timeUnit time unit
* @return true if wait succeeded, false if wait timed out
boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit);
* Update index setting.
* @param index the index
* @param key the key of the value to be updated
* @param value the new value
* @param timeout timeout
* @param timeUnit time unit
* @throws IOException if update index setting failed
void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) throws IOException;
* Resolve alias.
* @param alias the alias
* @return this index name behind the alias or the alias if there is no index
String resolveAlias(String alias);
* Resolve alias to all connected indices, sort index names with most recent timestamp on top, return this index
* name.
* @param alias the alias
* @return the most recent index name pointing to the alias
String resolveMostRecentIndex(String alias);
* Get all index filters.
* @param index the index
* @return map of index filters
Map<String, String> getAliases(String index);
* Shift from one index to another.
* @param indexDefinition the index definition
* @param additionalAliases new aliases
* @return this
IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases);
* Shift from one index to another.
* @param indexDefinition the index definition
* @param additionalAliases new aliases
* @param indexAliasAdder method to add aliases
* @return this
IndexShiftResult shiftIndex(IndexDefinition indexDefinition, List<String> additionalAliases,
IndexAliasAdder indexAliasAdder);
* Shift from one index to another.
* @param index the index name
* @param fullIndexName the index name with timestamp
* @param additionalAliases a list of names that should be set as index aliases
* @return this
IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases);
* Shift from one index to another.
* @param index the index name
* @param fullIndexName the index name with timestamp
* @param additionalAliases a list of names that should be set as index aliases
* @param adder an adder method to create alias term queries
* @return this
IndexShiftResult shiftIndex(String index, String fullIndexName, List<String> additionalAliases,
IndexAliasAdder adder);
* Prune index.
* @param indexDefinition the index definition
* @return the index prune result
IndexPruneResult pruneIndex(IndexDefinition indexDefinition);
* Apply retention policy to prune indices. All indices before delta should be deleted,
* but the number of mintokeep indices must be kept.
* @param index index name
* @param fullIndexName index name with timestamp
* @param delta timestamp delta (for index timestamps)
* @param mintokeep minimum number of indices to keep
* @param perform true if pruning should be executed, false if not
* @return the index prune result
IndexPruneResult pruneIndex(String index, String fullIndexName, int delta, int mintokeep, boolean perform);
* Find the timestamp of the most recently indexed document in the index.
* @param index the index name
* @param timestampfieldname the timestamp field name
* @return millis UTC millis of the most recent document
* @throws IOException if most rcent document can not be found
Long mostRecentDocument(String index, String timestampfieldname) throws IOException;
* Get cluster name.
* @return the cluster name
String getClusterName();

View file

package org.xbib.elx.api;
public interface ExtendedClientProvider<C extends ExtendedClient> {
C getExtendedClient();

View file

package org.xbib.elx.api;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
public interface IndexAliasAdder {
void addIndexAlias(IndicesAliasesRequest requwst, String index, String alias);

View file

package org.xbib.elx.api;
import java.util.concurrent.TimeUnit;
public interface IndexDefinition {
IndexDefinition setIndex(String index);
String getIndex();
IndexDefinition setFullIndexName(String fullIndexName);
String getFullIndexName();
IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException;
IndexDefinition setSettingsUrl(URL settingsUrl);
URL getSettingsUrl();
IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException;
IndexDefinition setMappingsUrl(URL mappingsUrl);
URL getMappingsUrl();
IndexDefinition setDateTimePattern(String timeWindow);
String getDateTimePattern();
IndexDefinition setEnabled(boolean enabled);
boolean isEnabled();
IndexDefinition setIgnoreErrors(boolean ignoreErrors);
boolean ignoreErrors();
IndexDefinition setShift(boolean shift);
boolean isShiftEnabled();
IndexDefinition setForceMerge(boolean hasForceMerge);
boolean hasForceMerge();
IndexDefinition setReplicaLevel(int replicaLevel);
int getReplicaLevel();
IndexDefinition setRetention(IndexRetention indexRetention);
IndexRetention getRetention();
IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit);
long getMaxWaitTime();
TimeUnit getMaxWaitTimeUnit();
IndexDefinition setStartRefreshInterval(long seconds);
long getStartRefreshInterval();
IndexDefinition setStopRefreshInterval(long seconds);
long getStopRefreshInterval();

View file

package org.xbib.elx.api;
import java.util.List;
public interface IndexPruneResult {
State getState();
List<String> getCandidateIndices();
List<String> getDeletedIndices();
boolean isAcknowledged();

View file

package org.xbib.elx.api;
public interface IndexRetention {
IndexRetention setDelta(int delta);
int getDelta();
IndexRetention setMinToKeep(int minToKeep);
int getMinToKeep();

View file

package org.xbib.elx.api;
import java.util.List;
public interface IndexShiftResult {
List<String> getMovedAliases();
List<String> getNewAliases();

View file

* The API of the extended Elasticsearch clients.
package org.xbib.elx.api;

@ -0,0 +1,5 @@
compile project(':elx-api')
testCompile "org.xbib.elasticsearch:elasticsearch-analysis-common:${'elasticsearch-server.version')}"
testCompile "org.xbib.elasticsearch:transport-netty4:${'elasticsearch-server.version')}"

File diff suppressed because it is too large Load diff

View file

package org.xbib.elx.common;
import org.elasticsearch.Version;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.ExtendedClientProvider;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
public class ClientBuilder {
private final ElasticsearchClient client;
private final Settings.Builder settingsBuilder;
private Map<Class<? extends ExtendedClientProvider>, ExtendedClientProvider> providerMap;
private Class<? extends ExtendedClientProvider> provider;
public ClientBuilder() {
public ClientBuilder(ElasticsearchClient client) {
this(client, Thread.currentThread().getContextClassLoader());
public ClientBuilder(ElasticsearchClient client, ClassLoader classLoader) {
this.client = client;
this.settingsBuilder = Settings.builder();
settingsBuilder.put("", "elx-client-" + Version.CURRENT);
this.providerMap = new HashMap<>();
ServiceLoader<ExtendedClientProvider> serviceLoader = ServiceLoader.load(ExtendedClientProvider.class,
classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader());
for (ExtendedClientProvider provider : serviceLoader) {
providerMap.put(provider.getClass(), provider);
public static ClientBuilder builder() {
return new ClientBuilder();
public static ClientBuilder builder(ElasticsearchClient client) {
return new ClientBuilder(client);
public ClientBuilder provider(Class<? extends ExtendedClientProvider> provider) {
this.provider = provider;
return this;
public ClientBuilder put(String key, String value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Integer value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Long value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, Double value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, ByteSizeValue value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(String key, TimeValue value) {
settingsBuilder.put(key, value);
return this;
public ClientBuilder put(Settings settings) {
return this;
public <C extends ExtendedClient> C build() throws IOException {
if (provider == null) {
throw new IllegalArgumentException("no provider");
return (C) providerMap.get(provider).getExtendedClient().setClient(client).init(;

View file

package org.xbib.elx.common;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.xbib.elx.api.BulkController;
import org.xbib.elx.api.BulkMetric;
import org.xbib.elx.api.BulkProcessor;
import org.xbib.elx.api.ExtendedClient;
import org.xbib.elx.api.IndexDefinition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class DefaultBulkController implements BulkController {
private static final Logger logger = LogManager.getLogger(DefaultBulkController.class);
private final ExtendedClient client;
private final BulkMetric bulkMetric;
private final List<String> indexNames;
private final Map<String, Long> startBulkRefreshIntervals;
private final Map<String, Long> stopBulkRefreshIntervals;
private long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private BulkProcessor bulkProcessor;
private BulkListener bulkListener;
private AtomicBoolean active;
public DefaultBulkController(ExtendedClient client, BulkMetric bulkMetric) {
this.client = client;
this.bulkMetric = bulkMetric;
this.indexNames = new ArrayList<>(); = new AtomicBoolean(false);
this.startBulkRefreshIntervals = new HashMap<>();
this.stopBulkRefreshIntervals = new HashMap<>();
this.maxWaitTime = 30L;
this.maxWaitTimeUnit = TimeUnit.SECONDS;
public Throwable getLastBulkError() {
return bulkListener.getLastBulkError();
public void init(Settings settings) {
int maxActionsPerRequest = settings.getAsInt(,
int maxConcurrentRequests = settings.getAsInt(,
TimeValue flushIngestInterval = settings.getAsTime(,
ByteSizeValue maxVolumePerRequest = settings.getAsBytesSize(,
if (logger.isInfoEnabled()) {"bulk processor up with maxActionsPerRequest = {} maxConcurrentRequests = {} " +
"flushIngestInterval = {} maxVolumePerRequest = {}",
maxActionsPerRequest, maxConcurrentRequests, flushIngestInterval, maxVolumePerRequest);
this.bulkListener = new BulkListener();
DefaultBulkProcessor.Builder builder = DefaultBulkProcessor.builder((Client) client.getClient(), bulkListener)
this.bulkProcessor =;;
public void startBulkMode(IndexDefinition indexDefinition) throws IOException {
startBulkMode(indexDefinition.getFullIndexName(), indexDefinition.getStartRefreshInterval(),
public void startBulkMode(String indexName,
long startRefreshIntervalInSeconds,
long stopRefreshIntervalInSeconds) throws IOException {
if (!indexNames.contains(indexName)) {
startBulkRefreshIntervals.put(indexName, startRefreshIntervalInSeconds);
stopBulkRefreshIntervals.put(indexName, stopRefreshIntervalInSeconds);
if (startRefreshIntervalInSeconds != 0L) {
client.updateIndexSetting(indexName, "refresh_interval", startRefreshIntervalInSeconds + "s",
30L, TimeUnit.SECONDS);
public void index(IndexRequest indexRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(indexRequest.index(), indexRequest.type(),;
} catch (Exception e) {
bulkListener.lastBulkError = e;
if (logger.isErrorEnabled()) {
logger.error("bulk add of index failed: " + e.getMessage(), e);
public void delete(DeleteRequest deleteRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(deleteRequest.index(), deleteRequest.type(),;
} catch (Exception e) {
bulkListener.lastBulkError = e;
if (logger.isErrorEnabled()) {
logger.error("bulk add of delete failed: " + e.getMessage(), e);
public void update(UpdateRequest updateRequest) {
if (!active.get()) {
throw new IllegalStateException("inactive");
try {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().inc(updateRequest.index(), updateRequest.type(),;
} catch (Exception e) {
bulkListener.lastBulkError = e;
if (logger.isErrorEnabled()) {
logger.error("bulk add of update failed: " + e.getMessage(), e);
public boolean waitForResponses(long timeout, TimeUnit timeUnit) {
try {
return bulkProcessor.awaitFlush(timeout, timeUnit);
} catch (InterruptedException e) {
return false;
public void stopBulkMode(IndexDefinition indexDefinition) throws IOException {
indexDefinition.getMaxWaitTime(), indexDefinition.getMaxWaitTimeUnit());
public void stopBulkMode(String index, long timeout, TimeUnit timeUnit) throws IOException {
if (waitForResponses(timeout, timeUnit)) {
if (indexNames.contains(index)) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L) {
client.updateIndexSetting(index, "refresh_interval", secs + "s",
30L, TimeUnit.SECONDS);
public void flush() throws IOException {
if (bulkProcessor != null) {
public void close() throws IOException {
if (client.waitForResponses(maxWaitTime, maxWaitTimeUnit)) {
for (String index : indexNames) {
Long secs = stopBulkRefreshIntervals.get(index);
if (secs != null && secs != 0L)
client.updateIndexSetting(index, "refresh_interval", secs + "s",
30L, TimeUnit.SECONDS);
if (bulkProcessor != null) {
private class BulkListener implements DefaultBulkProcessor.Listener {
private final Logger logger = LogManager.getLogger("org.xbib.elx.BulkProcessor.Listener");
private Throwable lastBulkError = null;
public void beforeBulk(long executionId, BulkRequest request) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
int n = request.numberOfActions();
if (logger.isDebugEnabled()) {
logger.debug("before bulk [{}] [actions={}] [bytes={}] [concurrent requests={}]",
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long l = 0;
if (bulkMetric != null) {
l = bulkMetric.getCurrentIngest().getCount();
int n = 0;
for (BulkItemResponse itemResponse : response.getItems()) {
if (bulkMetric != null) {
bulkMetric.getCurrentIngest().dec(itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId());
if (itemResponse.isFailed()) {
if (bulkMetric != null) {
if (bulkMetric != null && logger.isDebugEnabled()) {
logger.debug("after bulk [{}] [succeeded={}] [failed={}] [{}ms] {} concurrent requests",
if (n > 0) {
if (logger.isErrorEnabled()) {
logger.error("bulk [{}] failed with {} failed items, failure message = {}",
executionId, n, response.buildFailureMessage());
} else {
if (bulkMetric != null) {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
if (bulkMetric != null) {
lastBulkError = failure;
if (logger.isErrorEnabled()) {
logger.error("after bulk [" + executionId + "] error", failure);
Throwable getLastBulkError() {
return lastBulkError;

@ -1,16 +1,15 @@
package org.xbib.elasticsearch.client;
package org.xbib.elx.common;
import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.api.BulkMetric;
import org.xbib.metrics.Count;
import org.xbib.metrics.CountMetric;
import org.xbib.metrics.Meter;
import org.xbib.metrics.Metered;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class SimpleBulkMetric implements BulkMetric {
private final ScheduledExecutorService executorService;
public class DefaultBulkMetric implements BulkMetric {
private final Meter totalIngest;
@ -30,13 +29,8 @@ public class SimpleBulkMetric implements BulkMetric {
private Long stopped;
public SimpleBulkMetric() {
public SimpleBulkMetric(ScheduledExecutorService executorService) {
this.executorService = executorService;
totalIngest = new Meter(executorService);
public DefaultBulkMetric() {
totalIngest = new Meter(Executors.newSingleThreadScheduledExecutor());
totalIngestSizeInBytes = new CountMetric();
currentIngest = new CountMetric();
currentIngestNumDocs = new CountMetric();
@ -45,6 +39,11 @@ public class SimpleBulkMetric implements BulkMetric {
failed = new CountMetric();
public void init(Settings settings) {
public Metered getTotalIngest() {
return totalIngest;
@ -80,6 +79,11 @@ public class SimpleBulkMetric implements BulkMetric {
return failed;
public long elapsed() {
return started != null ? ((stopped != null ? stopped : System.nanoTime()) - started) : -1L;
public void start() {
this.started = System.nanoTime();
@ -90,12 +94,11 @@ public class SimpleBulkMetric implements BulkMetric {
public void stop() {
this.stopped = System.nanoTime();
public long elapsed() {
return (stopped != null ? stopped : System.nanoTime()) - started;
public void close() {

@ -1,19 +1,21 @@
package org.xbib.elasticsearch.client;
package org.xbib.elx.common;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.xbib.elx.api.BulkProcessor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -27,36 +29,38 @@ import java.util.concurrent.atomic.AtomicLong;
* requests allowed to be executed in parallel.
* In order to create a new bulk processor, use the {@link Builder}.
public class BulkProcessor implements Closeable {
public class DefaultBulkProcessor implements BulkProcessor {
private final int maximumBulkActionsPerRequest;
private final int bulkActions;
private final long maximumBulkRequestByteSize;
private final long bulkSize;
private final ScheduledThreadPoolExecutor scheduler;
private final ScheduledFuture<?> scheduledFuture;
private final AtomicLong executionIdGen = new AtomicLong();
private final AtomicLong executionIdGen;
private final BulkExecutor bulkExecutor;
private final BulkRequestHandler bulkRequestHandler;
private BulkRequest bulkRequest;
private volatile boolean closed = false;
private volatile boolean closed;
private BulkProcessor(ElasticsearchClient client, Listener listener, int maximumConcurrentBulkRequests,
int maximumBulkActionsPerRequest, ByteSizeValue maximumBulkRequestByteSize,
@Nullable TimeValue flushInterval) {
this.maximumBulkActionsPerRequest = maximumBulkActionsPerRequest;
this.maximumBulkRequestByteSize = maximumBulkRequestByteSize.getBytes();
private DefaultBulkProcessor(Client client, Listener listener, String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkExecutor = maximumConcurrentBulkRequests == 0 ?
new SyncBulkExecutor(client, listener) :
new AsyncBulkExecutor(client, listener, maximumConcurrentBulkRequests);
this.bulkRequestHandler = concurrentRequests == 0 ?
new SyncBulkRequestHandler(client, listener) :
new AsyncBulkRequestHandler(client, listener, concurrentRequests);
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
name != null ? "[" + name + "]" : "" + "bulk_processor"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(),
@ -67,7 +71,7 @@ public class BulkProcessor implements Closeable {
public static Builder builder(ElasticsearchClient client, Listener listener) {
public static Builder builder(Client client, Listener listener) {
if (client == null) {
throw new NullPointerException("The client you specified while building a BulkProcessor is null");
@ -75,20 +79,28 @@ public class BulkProcessor implements Closeable {
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
* Wait for bulk request handler with flush.
* @param timeout the timeout value
* @return true is method was successful, false if timeout
* @throws InterruptedException if timeout
public void close() {
try {
awaitClose(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException exc) {
public synchronized boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
// flush
if (bulkRequest.numberOfActions() > 0) {
// wait for all bulk responses
return this.bulkRequestHandler.close(timeout, unit);
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are
* flushed.
* Closes the processor. Any remaining bulk actions are flushed and then closed. This emthod can only be called
* once as the last action of a bulk processor.
* If concurrent requests are not enabled, returns {@code true} immediately.
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then
@ -101,98 +113,50 @@ public class BulkProcessor implements Closeable {
* bulk requests completed
* @throws InterruptedException If the current thread is interrupted
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
closed = true;
if (this.scheduledFuture != null) {
if (bulkRequest.numberOfActions() > 0) {
return bulkExecutor.awaitClose(timeout, unit);
return this.bulkRequestHandler.close(timeout, unit);
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
* Adds either a delete or an index request.
* @param request request
* @return his bulk processor
public synchronized BulkProcessor add(IndexRequest request) {
if (request == null) {
return this;
if (isOverTheLimit()) {
return this;
public DefaultBulkProcessor add(ActionRequest request) {
return add(request, null);
* Adds an {@link DeleteRequest} to the list of actions to execute.
* Adds either a delete or an index request with a payload.
* @param request request
* @param payload payload
* @return his bulk processor
public synchronized BulkProcessor add(DeleteRequest request) {
if (request == null) {
return this;
if (isOverTheLimit()) {
public DefaultBulkProcessor add(ActionRequest request, Object payload) {
internalAdd(request, payload);
return this;
* Adds an {@link UpdateRequest} to the list of actions to execute.
* @param request request
* @return his bulk processor
public synchronized BulkProcessor add(UpdateRequest request) {
if (request == null) {
return this;
if (isOverTheLimit()) {
private void ensureOpen() {
if (closed) {
throw new IllegalStateException("bulk process already closed");
private boolean isOverTheLimit() {
final int count = bulkRequest.numberOfActions();
return count > 0 &&
(maximumBulkActionsPerRequest != -1 && count >= maximumBulkActionsPerRequest) ||
(maximumBulkRequestByteSize != -1 && bulkRequest.estimatedSizeInBytes() >= maximumBulkRequestByteSize);
private void execute() {
final BulkRequest myBulkRequest = this.bulkRequest;
bulkExecutor.execute(myBulkRequest, executionIdGen.incrementAndGet());
this.bulkRequest = new BulkRequest();
* Flush pending delete or index requests.
public synchronized void flush() {
if (bulkRequest.numberOfActions() > 0) {
@ -201,39 +165,58 @@ public class BulkProcessor implements Closeable {
* A listener for the execution.
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
public interface Listener {
public void close() {
try {
// 0 = immediate close
awaitClose(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException exc) {
* Callback before the bulk is executed.
* @param executionId execution ID
* @param request request
void beforeBulk(long executionId, BulkRequest request);
private void ensureOpen() {
if (closed) {
* Callback after a successful execution of bulk request.
* @param executionId execution ID
* @param request request
* @param response response
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
private synchronized void internalAdd(ActionRequest request, Object payload) {
if (request instanceof IndexRequest) {
bulkRequest.add((IndexRequest) request, payload);
} else if (request instanceof DeleteRequest) {
bulkRequest.add((DeleteRequest) request, payload);
} else if (request instanceof UpdateRequest) {
bulkRequest.add((UpdateRequest) request, payload);
} else {
throw new UnsupportedOperationException();
* Callback after a failed execution of bulk request.
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request
* processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
* @param executionId execution ID
* @param request request
* @param failure failure
void afterBulk(long executionId, BulkRequest request, Throwable failure);
private void executeIfNeeded() {
if (!isOverTheLimit()) {
private void execute() {
final BulkRequest myBulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler.execute(myBulkRequest, executionId);
return bulkActions != -1 &&
bulkRequest.numberOfActions() >= bulkActions ||
bulkSize != -1 &&
bulkRequest.estimatedSizeInBytes() >= bulkSize;
@ -241,11 +224,18 @@ public class BulkProcessor implements Closeable {
public static class Builder {
private final ElasticsearchClient client;
private final Client client;
private final Listener listener;
private String name;
private int concurrentRequests = 1;
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private ByteSizeValue bulkSize = new ByteSizeValue(10, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
@ -255,11 +245,22 @@ public class BulkProcessor implements Closeable {
* @param client the client
* @param listener the listener
Builder(ElasticsearchClient client, Listener listener) {
Builder(Client client, Listener listener) {
this.client = client;
this.listener = listener;
* Sets an optional name to identify this bulk processor.
* @param name name
* @return this builder
public Builder setName(String name) { = name;
return this;
* Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
* request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
@ -277,7 +278,7 @@ public class BulkProcessor implements Closeable {
* Sets when to flush a new bulk request based on the number of actions currently added. Defaults to
* {@code 1000}. Can be set to {@code -1} to disable it.
* @param bulkActions mbulk actions
* @param bulkActions bulk actions
* @return this builder
public Builder setBulkActions(int bulkActions) {
@ -299,7 +300,7 @@ public class BulkProcessor implements Closeable {
* Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
* Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(ByteSizeValue)}
* Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
* can be set to {@code -1} with the flush interval set allowing for complete async processing of bulk actions.
* @param flushInterval flush interval
@ -315,8 +316,8 @@ public class BulkProcessor implements Closeable {
* @return a bulk processor
public BulkProcessor build() {
return new BulkProcessor(client, listener, concurrentRequests, bulkActions, bulkSize, flushInterval);
public DefaultBulkProcessor build() {
return new DefaultBulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
@ -324,32 +325,25 @@ public class BulkProcessor implements Closeable {
public void run() {
synchronized (BulkProcessor.this) {
synchronized (DefaultBulkProcessor.this) {
if (bulkRequest.numberOfActions() > 0) {
if (bulkRequest.numberOfActions() == 0) {
interface BulkExecutor {
private static class SyncBulkRequestHandler implements BulkRequestHandler {
void execute(BulkRequest bulkRequest, long executionId);
private final Client client;
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
private final DefaultBulkProcessor.Listener listener;
private static class SyncBulkExecutor implements BulkExecutor {
private final ElasticsearchClient client;
private final BulkProcessor.Listener listener;
SyncBulkExecutor(ElasticsearchClient client, BulkProcessor.Listener listener) {
SyncBulkRequestHandler(Client client, DefaultBulkProcessor.Listener listener) {
this.client = client;
this.listener = listener;
@ -370,22 +364,22 @@ public class BulkProcessor implements Closeable {
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
public boolean close(long timeout, TimeUnit unit) {
return true;
private static class AsyncBulkExecutor implements BulkExecutor {
private static class AsyncBulkRequestHandler implements BulkRequestHandler {
private final ElasticsearchClient client;
private final Client client;
private final BulkProcessor.Listener listener;
private final DefaultBulkProcessor.Listener listener;
private final Semaphore semaphore;
private final int concurrentRequests;
private AsyncBulkExecutor(ElasticsearchClient client, BulkProcessor.Listener listener, int concurrentRequests) {
private AsyncBulkRequestHandler(Client client, DefaultBulkProcessor.Listener listener, int concurrentRequests) {
this.client = client;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
@ -400,7 +394,7 @@ public class BulkProcessor implements Closeable {
listener.beforeBulk(executionId, bulkRequest);
acquired = true;
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<BulkResponse>() {
client.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() {
public void onResponse(BulkResponse response) {
try {
@ -433,9 +427,9 @@ public class BulkProcessor implements Closeable {
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
public boolean close(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) {
return true;
return false;

@ -0,0 +1,214 @@
package org.xbib.elx.common;
import org.xbib.elx.api.IndexDefinition;
import org.xbib.elx.api.IndexRetention;
import java.util.concurrent.TimeUnit;
public class DefaultIndexDefinition implements IndexDefinition {
private String index;
private String fullIndexName;
private String dateTimePattern;
private URL settingsUrl;
private URL mappingsUrl;
private boolean enabled;
private boolean ignoreErrors;
private boolean switchAliases;
private boolean hasForceMerge;
private int replicaLevel;
private IndexRetention indexRetention;
private long maxWaitTime;
private TimeUnit maxWaitTimeUnit;
private long startRefreshInterval;
private long stopRefreshInterval;
public IndexDefinition setIndex(String index) {
this.index = index;
return this;
public String getIndex() {
return index;
public IndexDefinition setFullIndexName(String fullIndexName) {
this.fullIndexName = fullIndexName;
return this;
public String getFullIndexName() {
return fullIndexName;
public IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException {
return this;
public IndexDefinition setSettingsUrl(URL settingsUrl) {
this.settingsUrl = settingsUrl;
return this;
public URL getSettingsUrl() {
return settingsUrl;
public IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException {
this.mappingsUrl = mappingsUrlString != null ? new URL(mappingsUrlString) : null;
return this;
public IndexDefinition setMappingsUrl(URL mappingsUrl) {
this.mappingsUrl = mappingsUrl;
return this;
public URL getMappingsUrl() {
return mappingsUrl;
public IndexDefinition setDateTimePattern(String timeWindow) {
this.dateTimePattern = timeWindow;
return this;
public String getDateTimePattern() {
return dateTimePattern;
public IndexDefinition setEnabled(boolean enabled) {
this.enabled = enabled;
return this;
public boolean isEnabled() {
return enabled;
public IndexDefinition setIgnoreErrors(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
return this;
public boolean ignoreErrors() {
return ignoreErrors;
public IndexDefinition setShift(boolean switchAliases) {
this.switchAliases = switchAliases;
return this;
public boolean isShiftEnabled() {
return switchAliases;
public IndexDefinition setForceMerge(boolean hasForceMerge) {
this.hasForceMerge = hasForceMerge;
return this;
public boolean hasForceMerge() {
return hasForceMerge;
public IndexDefinition setReplicaLevel(int replicaLevel) {
this.replicaLevel = replicaLevel;
return this;
public int getReplicaLevel() {
return replicaLevel;
public IndexDefinition setRetention(IndexRetention indexRetention) {
this.indexRetention = indexRetention;
return this;
public IndexRetention getRetention() {
return indexRetention;
public IndexDefinition setMaxWaitTime(long maxWaitTime, TimeUnit timeUnit) {
this.maxWaitTime = maxWaitTime;
this.maxWaitTimeUnit = timeUnit;
return this;
public long getMaxWaitTime() {
return maxWaitTime;
public TimeUnit getMaxWaitTimeUnit() {
return maxWaitTimeUnit;
public IndexDefinition setStartRefreshInterval(long seconds) {
this.startRefreshInterval = seconds;
return this;
public long getStartRefreshInterval() {
return startRefreshInterval;
public IndexDefinition setStopRefreshInterval(long seconds) {
this.stopRefreshInterval = seconds;
return this;
public long getStopRefreshInterval() {
return stopRefreshInterval;

@ -0,0 +1,32 @@
package org.xbib.elx.common;
import org.xbib.elx.api.IndexRetention;
public class DefaultIndexRetention implements IndexRetention {
private int delta;
private int minToKeep;
public IndexRetention setDelta(int delta) { = delta;
return this;
public int getDelta() {
return delta;
public IndexRetention setMinToKeep(int minToKeep) {
this.minToKeep = minToKeep;
return this;
public int getMinToKeep() {
return minToKeep;

View file

package org.xbib.elx.common;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.TimeUnit;
* A mocked client, it does not perform any actions on a cluster. Useful for testing.
public class MockExtendedClient extends AbstractExtendedClient {
public ElasticsearchClient getClient() {
return null;
public MockExtendedClient init(Settings settings) {
return this;
protected ElasticsearchClient createClient(Settings settings) {
return null;
protected void closeClient() {
public MockExtendedClient index(String index, String id, boolean create, String source) {
return this;
public MockExtendedClient delete(String index, String id) {
return this;
public MockExtendedClient update(String index, String id, String source) {
return this;
public MockExtendedClient index(IndexRequest indexRequest) {
return this;
public MockExtendedClient delete(DeleteRequest deleteRequest) {
return this;
public MockExtendedClient update(UpdateRequest updateRequest) {
return this;
public MockExtendedClient startBulk(String index, long startRefreshInterval, long stopRefreshIterval) {
return this;
public MockExtendedClient stopBulk(String index, long maxWaitTime, TimeUnit timeUnit) {
return this;
public MockExtendedClient newIndex(String index) {
return this;
public MockExtendedClient deleteIndex(String index) {
return this;
public MockExtendedClient refreshIndex(String index) {
return this;
public MockExtendedClient flushIndex(String index) {
return this;
public boolean forceMerge(String index, long maxWaitTime, TimeUnit timeUnit) {
return true;
public boolean waitForCluster(String healthColor, long timeValue, TimeUnit timeUnit) {
return true;
public boolean waitForResponses(long maxWaitTime, TimeUnit timeUnit) {
return true;
public boolean waitForRecovery(String index, long maxWaitTime, TimeUnit timeUnit) {
return true;
public MockExtendedClient updateReplicaLevel(String index, int level, long maxWaitTime, TimeUnit timeUnit) {
return this;
public void flush() {
// nothing to do
public void close() {
// nothing to do

@ -0,0 +1,10 @@
package org.xbib.elx.common;
import org.xbib.elx.api.ExtendedClientProvider;
public class MockExtendedClientProvider implements ExtendedClientProvider<MockExtendedClient> {
public MockExtendedClient getExtendedClient() {
return new MockExtendedClient();

View file

package org.xbib.elx.common;
public enum Parameters {
MAX_ACTIONS_PER_REQUEST ("max_actions_per_request"),
int num;
String string;
Parameters(int num) {
this.num = num;
Parameters(String string) {
this.string = string;
int getNum() {
return num;
String getString() {
return string;

@ -0,0 +1,25 @@
public class ClasspathURLStreamHandler extends URLStreamHandler {
private final ClassLoader classLoader;
public ClasspathURLStreamHandler() {
this.classLoader = getClass().getClassLoader();
public ClasspathURLStreamHandler(ClassLoader classLoader) {
this.classLoader = classLoader;
protected URLConnection openConnection(URL u) throws IOException {
final URL resourceUrl = classLoader.getResource(u.getPath());
return resourceUrl != null ? resourceUrl.openConnection() : null;

@ -0,0 +1,12 @@
public class ClasspathURLStreamHandlerFactory implements URLStreamHandlerFactory {
public URLStreamHandler createURLStreamHandler(String protocol) {
return "classpath".equals(protocol) ? new ClasspathURLStreamHandler() : null;

View file

View file

package org.xbib.elx.common;

View file

package org.xbib.elasticsearch.client;
package org.xbib.elx.common.util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -16,6 +16,9 @@ import java.util.Enumeration;
import java.util.List;
import java.util.Locale;
public class NetworkUtils {
private static final Logger logger = LogManager.getLogger(NetworkUtils.class.getName());
@ -100,10 +103,8 @@ public class NetworkUtils {
NetworkInterface networkInterface = interfaces.nextElement();
Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces();
if (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) {
@ -221,10 +222,8 @@ public class NetworkUtils {
NetworkInterface networkInterface = interfaces.nextElement();
Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces();
if (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) {
@ -250,6 +249,9 @@ public class NetworkUtils {
return left.length - right.length;
public enum ProtocolVersion {

View file

package org.xbib.elx.common.util;

View file

View file

@ -0,0 +1 @@

@ -1,4 +1,7 @@
package org.xbib.elasticsearch.client.common;
package org.xbib.elx.common.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
@ -9,8 +12,9 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.testframework.ESSingleNodeTestCase;
import org.junit.Test;
import java.util.Collections;
import java.util.Iterator;
@ -19,58 +23,72 @@ import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class AliasTests extends ESSingleNodeTestCase {
public class AliasTest extends TestBase {
private static final Logger logger = LogManager.getLogger(AliasTests.class.getName());
private static final Logger logger = LogManager.getLogger(AliasTest.class.getName());
public void testAlias() {
Client client = client("1");
CreateIndexRequest indexRequest = new CreateIndexRequest("test");
// put alias
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
String[] indices = new String[]{"test"};
String[] aliases = new String[]{"test_alias"};
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
// get alias
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY);
long t0 = System.nanoTime();
GetAliasesResponse getAliasesResponse = client().admin().indices().getAliases(getAliasesRequest).actionGet();
GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(getAliasesRequest).actionGet();
long t1 = (System.nanoTime() - t0) / 1000000;"{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
assertTrue(t1 >= 0);
public void testMostRecentIndex() {
Client client = client("1");
String alias = "test";
CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101");
indexRequest = new CreateIndexRequest("test20160102");
indexRequest = new CreateIndexRequest("test20160103");
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
.indices("test20160101", "test20160102", "test20160103")
String[] indices = new String[]{"test20160101", "test20160102", "test20160103"};
String[] aliases = new String[]{alias};
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client(),
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client,
GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");
Set<String> result = new TreeSet<>(Collections.reverseOrder());
for (ObjectCursor<String> indexName : getAliasesResponse.getAliases().keys()) {
Matcher m = pattern.matcher(indexName.value);
if (m.matches() && alias.equals( {
if (m.matches()) {
if (alias.equals( {
Iterator<String> it = result.iterator();
assertEquals("test20160101",;"result={}", result);"success: result={}", result);

View file

package org.xbib.elx.common.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class ClusterBlockTest extends TestBase {
private static final Logger logger = LogManager.getLogger("test");
public void startNodes() {
try {
// do not wait for green health state"ready");
} catch (Throwable t) {
logger.error("startNodes failed", t);
protected Settings getNodeSettings() {
return Settings.builder()
.put("discovery.zen.minimum_master_nodes", 2) // block until we have two nodes
@Test(expected = ClusterBlockException.class)
public void testClusterBlock() throws Exception {
Client client = client("1");
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("field1", "value1").endObject();
IndexRequestBuilder irb = client.prepareIndex("test", "test", "1").setSource(builder);
BulkRequestBuilder brb = client.prepareBulk();

View file

package org.xbib.elx.common.test;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.MockExtendedClient;
import org.xbib.elx.common.MockExtendedClientProvider;
import static org.junit.Assert.assertNotNull;
public class MockExtendedClientProviderTest {
public void testMockExtendedProvider() throws IOException {
MockExtendedClient client = ClientBuilder.builder().provider(MockExtendedClientProvider.class).build();

View file

package org.xbib.elx.common.test;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.util.List;
public class MockNode extends Node {
public MockNode(Settings settings, List<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins);

@ -1,4 +1,4 @@
package org.xbib.elasticsearch.client.common;
package org.xbib.elx.common.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -13,17 +13,12 @@ public class NetworkTest {
private static final Logger logger = LogManager.getLogger(NetworkTest.class);
* Demonstrates the slowness oj Java network interface lookup on certain environments.
* May be a killer for ES node startup - so avoid automatic traversal of NICs at all costs.
* @throws Exception if test fails
public void testNetwork() throws Exception {
// walk over all found interfaces (this is slow - multicast/pings are performed)
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface netint : Collections.list(nets)) {"checking network interface = " + netint.getName());
System.out.println("checking network interface = " + netint.getName());
Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
for (InetAddress addr : Collections.list(inetAddresses)) {"found address = " + addr.getHostAddress()

View file

package org.xbib.elx.common.test;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class SearchTest extends TestBase {
public void testSearch() throws Exception {
Client client = client("1");
BulkRequestBuilder builder = new BulkRequestBuilder(client, BulkAction.INSTANCE);
for (int i = 0; i < 1000; i++) {
IndexRequest indexRequest = new IndexRequest("pages", "row")
.field("user1", "joerg")
.field("user2", "joerg")
.field("user3", "joerg")
.field("user4", "joerg")
.field("user5", "joerg")
.field("user6", "joerg")
.field("user7", "joerg")
.field("user8", "joerg")
.field("user9", "joerg")
.field("rowcount", i)
.field("rs", 1234)
client.admin().indices().refresh(new RefreshRequest()).actionGet();
for (int i = 0; i < 100; i++) {
QueryBuilder queryStringBuilder = QueryBuilders.queryStringQuery("rs:" + 1234);
SearchRequestBuilder requestBuilder = client.prepareSearch()
.addSort("rowcount", SortOrder.DESC)
.setFrom(i * 10).setSize(10);
SearchResponse searchResponse = requestBuilder.execute().actionGet();
assertTrue(searchResponse.getHits().getTotalHits() > 0);

package org.xbib.elasticsearch.client.common;
package org.xbib.elx.common.test;
import static org.junit.Assert.assertEquals;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.testframework.ESSingleNodeTestCase;
import org.junit.Test;
public class SimpleTests extends ESSingleNodeTestCase {
private static final Logger logger = LogManager.getLogger(SimpleTests.class.getName());
public class SimpleTest extends TestBase {
public void test() throws Exception {
try {
DeleteIndexRequestBuilder deleteIndexRequestBuilder =
new DeleteIndexRequestBuilder(client(), DeleteIndexAction.INSTANCE, "test");
new DeleteIndexRequestBuilder(client("1"), DeleteIndexAction.INSTANCE, "test");
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} catch (IndexNotFoundException e) {
// ignore if index not found
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client(),
.put("index.analysis.analyzer.default.filter.0", "lowercase")
// where is the trim token filter???
//.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
Settings indexSettings = Settings.builder()
.put("index.analysis.analyzer.default.filter.0", "lowercase")
.put("index.analysis.analyzer.default.filter.1", "trim")
.put("index.analysis.analyzer.default.tokenizer", "keyword")
CreateIndexRequestBuilder createIndexRequestBuilder = new CreateIndexRequestBuilder(client("1"), CreateIndexAction.INSTANCE);
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(client(), IndexAction.INSTANCE);
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(client("1"), IndexAction.INSTANCE);
String doc = client().prepareSearch("test")
RefreshRequestBuilder refreshRequestBuilder = new RefreshRequestBuilder(client("1"), RefreshAction.INSTANCE);
String doc = client("1").prepareSearch("test")

View file

package org.xbib.elx.common.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.junit.After;
import org.junit.Before;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class TestBase {
private static final Logger logger = LogManager.getLogger("test");
private static final Random random = new Random();
private static final char[] numbersAndLetters = ("0123456789abcdefghijklmnopqrstuvwxyz").toCharArray();
private Map<String, Node> nodes = new HashMap<>();
private Map<String, AbstractClient> clients = new HashMap<>();
private String cluster;
private String host;
private int port;
public void startNodes() {
try {"starting");
try {
ClusterHealthResponse healthResponse = client("1").execute(ClusterHealthAction.INSTANCE,
new ClusterHealthRequest().waitForStatus(ClusterHealthStatus.GREEN)
if (healthResponse != null && healthResponse.isTimedOut()) {
throw new IOException("cluster state is " + healthResponse.getStatus().name()
+ ", from here on, everything will fail!");
} catch (ElasticsearchTimeoutException e) {
throw new IOException("cluster does not respond to health request, cowardly refusing to continue");
ClusterStateRequestBuilder clusterStateRequestBuilder =
new ClusterStateRequestBuilder(client("1"), ClusterStateAction.INSTANCE).all();
ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();"cluster name = {}", clusterStateResponse.getClusterName().value());"host = {} port = {}", host, port);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
public void stopNodes() {
try {
} catch (Exception e) {
logger.error("can not close nodes", e);
} finally {
try {
deleteFiles();"data files wiped");
Thread.sleep(2000L); // let OS commit changes
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
// ignore
protected Settings getTransportSettings() {
return Settings.builder()
.put("host", host)
.put("port", port)
.put("", cluster)
.put("path.home", getHome())
protected Settings getNodeSettings() {
return Settings.builder()
.put("", cluster)
.put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME)
.put("path.home", getHome())
protected static String getHome() {
return System.getProperty("path.home", System.getProperty("user.dir"));
protected void startNode(String id) throws NodeValidationException {
protected AbstractClient client(String id) {
return clients.get(id);
protected void setClusterName(String cluster) {
this.cluster = cluster;
protected String getClusterName() {
return cluster;
protected String randomString(int len) {
final char[] buf = new char[len];
final int n = numbersAndLetters.length - 1;
for (int i = 0; i < buf.length; i++) {
buf[i] = numbersAndLetters[random.nextInt(n)];
return new String(buf);
private void closeNodes() throws IOException {"closing all clients");
for (AbstractClient client : clients.values()) {
clients.clear();"closing all nodes");
for (Node node : nodes.values()) {
if (node != null) {
nodes.clear();"all nodes closed");
private void findNodeAddress() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().transport(true);
NodesInfoResponse response = client("1").admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
TransportAddress address= response.getNodes().iterator().next().getTransport().getAddress()
host = address.address().getHostName();
port = address.address().getPort();
private Node buildNode(String id) {
Settings nodeSettings = Settings.builder()
.put("", id)
List<Class<? extends Plugin>> plugins = Arrays.asList(CommonAnalysisPlugin.class, Netty4Plugin.class);
Node node = new MockNode(nodeSettings, plugins);
AbstractClient client = (AbstractClient) node.client();
nodes.put(id, node);
clients.put(id, client);"clients={}", clients);
return node;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
Files.walkFileTree(directory, new SimpleFileVisitor<>() {
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
return FileVisitResult.CONTINUE;
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;

package org.xbib.elx.common.test;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
public class WildcardTest extends TestBase {
public void testWildcard() throws Exception {
index(client("1"), "1", "010");
index(client("1"), "2", "0*0");
// exact
validateCount(client("1"), QueryBuilders.queryStringQuery("010").defaultField("field"), 1);
validateCount(client("1"), QueryBuilders.queryStringQuery("0\\*0").defaultField("field"), 1);
// pattern
validateCount(client("1"), QueryBuilders.queryStringQuery("0*0").defaultField("field"), 1); // 2?
validateCount(client("1"), QueryBuilders.queryStringQuery("0?0").defaultField("field"), 1); // 2?
validateCount(client("1"), QueryBuilders.queryStringQuery("0**0").defaultField("field"), 1); // 2?
validateCount(client("1"), QueryBuilders.queryStringQuery("0??0").defaultField("field"), 0);
validateCount(client("1"), QueryBuilders.queryStringQuery("*10").defaultField("field"), 1);
validateCount(client("1"), QueryBuilders.queryStringQuery("*1*").defaultField("field"), 1);
validateCount(client("1"), QueryBuilders.queryStringQuery("*\\*0").defaultField("field"), 0); // 1?
validateCount(client("1"), QueryBuilders.queryStringQuery("*\\**").defaultField("field"), 0); // 1?
private void index(Client client, String id, String fieldValue) throws IOException {
client.index(new IndexRequest("index", "type", id)
.source(XContentFactory.jsonBuilder().startObject().field("field", fieldValue).endObject()))
client.admin().indices().refresh(new RefreshRequest()).actionGet();
private long count(Client client, QueryBuilder queryBuilder) {
return client.prepareSearch("index").setTypes("type")
private void validateCount(Client client, QueryBuilder queryBuilder, long expectedHits) {
final long actualHits = count(client, queryBuilder);
if (actualHits != expectedHits) {
throw new RuntimeException("actualHits=" + actualHits + ", expectedHits=" + expectedHits);

@ -0,0 +1,4 @@
package org.xbib.elx.common.test;

@ -2,11 +2,11 @@
<configuration status="OFF">
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="[%d{ABSOLUTE}][%-5p][%-25c][%t] %m%n"/>
<PatternLayout pattern="[%d{ISO8601}][%-5p][%-25c][%t] %m%n"/>
<Root level="debug">
<Root level="info">
<AppenderRef ref="Console" />

View file

@ -0,0 +1,6 @@
compile project(':elx-common')
compile "org.xbib.elasticsearch:transport-netty4:${'elasticsearch-server.version')}"
compile "org.xbib:netty-http-client:${'xbib-netty-http.version')}"
testCompile "org.xbib.elasticsearch:elasticsearch-analysis-common:${'elasticsearch-server.version')}"

@ -1,4 +1,4 @@
package org.xbib.elasticsearch.client.http;
package org.xbib.elx.http;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -13,21 +13,16 @@ import;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import org.xbib.elasticsearch.client.AbstractClient;
import org.xbib.elasticsearch.client.BulkControl;
import org.xbib.elasticsearch.client.BulkMetric;
import org.xbib.elx.common.AbstractExtendedClient;
import org.xbib.netty.http.client.Client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -35,52 +30,44 @@ import;
* Elasticsearch HTTP client.
public class HttpClient extends AbstractClient implements ElasticsearchClient {
public class ExtendedHttpClient extends AbstractExtendedClient implements ElasticsearchClient {
private static final Logger logger = LogManager.getLogger(HttpClient.class);
private static final Logger logger = LogManager.getLogger(ExtendedHttpClient.class);
private Client client;
private Client nettyHttpClient;
private NamedXContentRegistry registry;
private final ClassLoader classLoader;
private final NamedXContentRegistry registry;
private Map<GenericAction, HttpAction> actionMap;
private final Map<GenericAction, HttpAction> actionMap;
private List<String> urls;
private String url;
//private ThreadPool threadPool;
public HttpClient init(ElasticsearchClient client, Settings settings, BulkMetric metric, BulkControl control) {
init(client, settings, metric, control, null, Collections.emptyList());
return this;
public ExtendedHttpClient(List<NamedXContentRegistry.Entry> namedXContentEntries, ClassLoader classLoader) {
this.registry = new NamedXContentRegistry(Stream.of(getNamedXContents().stream(),;
this.classLoader = classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader();
this.actionMap = new HashMap<>();
@SuppressWarnings({"unchecked", "rawtypes"})
private void init(ElasticsearchClient client, Settings settings, BulkMetric metric, BulkControl control,
ClassLoader classLoader, List<NamedXContentRegistry.Entry> namedXContentEntries) {
//super.init(client, settings, metric, control);
this.urls = settings.getAsList("urls");
if (urls.isEmpty()) {
throw new IllegalArgumentException("no urls given");
public ExtendedHttpClient init(Settings settings) throws IOException {
if (settings == null) {
return null;
this.registry = new NamedXContentRegistry(Stream.of(getNamedXContents().stream(),
this.actionMap = new HashMap<>();
ServiceLoader<HttpAction> httpActionServiceLoader = ServiceLoader.load(HttpAction.class,
classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader());
this.url = settings.get("url");
ServiceLoader<HttpAction> httpActionServiceLoader = ServiceLoader.load(HttpAction.class, classLoader);
for (HttpAction<? extends ActionRequest, ? extends ActionResponse> httpAction : httpActionServiceLoader) {
actionMap.put(httpAction.getActionInstance(), httpAction);
this.client = Client.builder().enableDebug().build();
Settings threadPoolsettings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), "httpclient")
//this.threadPool = threadPool != null ? threadPool : new ThreadPool(threadPoolsettings);"HTTP client initialized with {} actions", actionMap.size());
this.nettyHttpClient = Client.builder().enableDebug().build();"extended HTTP client initialized with {} actions", actionMap.size());
return this;
private static List<NamedXContentRegistry.Entry> getNamedXContents() {
@ -91,28 +78,23 @@ public class HttpClient extends AbstractClient implements ElasticsearchClient {
return registry;
public static Builder builder() {
return new Builder();
public Client internalClient() {
return client;
return nettyHttpClient;
public ElasticsearchClient client() {
public ElasticsearchClient getClient() {
return this;
protected ElasticsearchClient createClient(Settings settings) throws IOException {
protected ElasticsearchClient createClient(Settings settings) {
return this;
public void shutdown() throws IOException {
protected void closeClient() throws IOException {
@ -142,68 +124,22 @@ public class HttpClient extends AbstractClient implements ElasticsearchClient {
public ThreadPool threadPool() {"returning null for threadPool() request");
return null; //threadPool;
return null;
@SuppressWarnings({"unchecked", "rawtypes"})
public <R extends ActionRequest, T extends ActionResponse, B extends ActionRequestBuilder<R, T, B>>
private <R extends ActionRequest, T extends ActionResponse, B extends ActionRequestBuilder<R, T, B>>
void doExecute(Action<R, T, B> action, R request, ActionListener<T> listener) {
HttpAction httpAction = actionMap.get(action);
if (httpAction == null) {
throw new IllegalStateException("failed to find http action [" + action + "] to execute");
}"http action = " + httpAction);
String url = urls.get(0); // TODO
try {"submitting to URL {}", url);
HttpActionContext httpActionContext = new HttpActionContext(this, request, url);
httpAction.execute(httpActionContext, listener);"submitted to URL {}", url);
logger.debug("submitted to URL {}", url);
} catch (Exception e) {
logger.error(e.getMessage(), e);
* The Builder for HTTP client.
public static class Builder {
private final Settings.Builder settingsBuilder = Settings.builder();
private ClassLoader classLoader;
private List<NamedXContentRegistry.Entry> namedXContentEntries;
private ThreadPool threadPool = null;
public Builder settings(Settings settings) {
return this;
public Builder classLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
public Builder namedXContentEntries(List<NamedXContentRegistry.Entry> namedXContentEntries) {
this.namedXContentEntries = namedXContentEntries;
return this;
public Builder threadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
return this;
@SuppressWarnings({"unchecked", "rawtypes"})
public HttpClient build() {
Settings settings =;
HttpClient httpClient = new HttpClient();
httpClient.init(null, settings, null, null,
classLoader, namedXContentEntries);
return httpClient;

package org.xbib.elx.http;
import org.xbib.elx.api.ExtendedClientProvider;
import java.util.Collections;
public class ExtendedHttpClientProvider implements ExtendedClientProvider<ExtendedHttpClient> {
public ExtendedHttpClient getExtendedClient() {
return new ExtendedHttpClient(Collections.emptyList(), Thread.currentThread().getContextClassLoader());

package org.xbib.elasticsearch.client.http;
package org.xbib.elx.http;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
@ -68,7 +68,7 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
" content = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8));
Transport transport = httpActionContext.getHttpClient().internalClient().execute(httpRequest);
Transport transport = httpActionContext.getExtendedHttpClient().internalClient().execute(httpRequest);"transport = " + transport);
if (transport.isFailed()) {
@ -143,7 +143,8 @@ public abstract class HttpAction<R extends ActionRequest, T extends ActionRespon
if (xContentType == null) {
throw new IllegalStateException("unsupported content-type: " + mediaType);
try (XContentParser parser = xContentType.xContent().createParser(httpActionContext.getHttpClient().getRegistry(),
try (XContentParser parser = xContentType.xContent()
httpActionContext.getHttpResponse().content().array())) {
return entityParser().apply(parser);

@ -1,4 +1,4 @@
package org.xbib.elasticsearch.client.http;
package org.xbib.elx.http;
import io.netty.handler.codec.http.FullHttpResponse;
import org.elasticsearch.action.ActionRequest;
@ -13,7 +13,7 @@ import org.xbib.netty.http.client.transport.Transport;
public class HttpActionContext<R extends ActionRequest, T extends ActionResponse> {
private final HttpClient httpClient;
private final ExtendedHttpClient extendedHttpClient;
private final R request;
@ -23,14 +23,14 @@ public class HttpActionContext<R extends ActionRequest, T extends ActionResponse
private FullHttpResponse httpResponse;
HttpActionContext(HttpClient httpClient, R request, String url) {
this.httpClient = httpClient;
HttpActionContext(ExtendedHttpClient extendedHttpClient, R request, String url) {
this.extendedHttpClient = extendedHttpClient;
this.request = request;
this.url = url;
public HttpClient getHttpClient() {
return httpClient;
public ExtendedHttpClient getExtendedHttpClient() {
return extendedHttpClient;
public R getRequest() {

@ -1,4 +1,4 @@
package org.xbib.elasticsearch.client.http;
package org.xbib.elx.http;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
@ -13,8 +13,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class HttpActionFuture<T, L> extends BaseFuture<T> implements ActionFuture<T>, ActionListener<L> {
private Transport httpClientTransport;

@ -0,0 +1,135 @@
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class HttpClusterHealthAction extends HttpAction<ClusterHealthRequest, ClusterHealthResponse> {
public ClusterHealthAction getActionInstance() {
return ClusterHealthAction.INSTANCE;
protected RequestBuilder createHttpRequest(String url, ClusterHealthRequest request) {
return newPutRequest(url, "/_cluster/health");
protected CheckedFunction<XContentParser, ClusterHealthResponse, IOException> entityParser() {
throw new UnsupportedOperationException();
private static final String CLUSTER_NAME = "cluster_name";
private static final String STATUS = "status";
private static final String TIMED_OUT = "timed_out";
private static final String NUMBER_OF_NODES = "number_of_nodes";
private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes";
private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks";
private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch";
private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE = "task_max_waiting_in_queue";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = "task_max_waiting_in_queue_millis";
private static final String ACTIVE_SHARDS_PERCENT_AS_NUMBER = "active_shards_percent_as_number";
private static final String ACTIVE_SHARDS_PERCENT = "active_shards_percent";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String INDICES = "indices";
private static final ConstructingObjectParser<ClusterHealthResponse, Void> PARSER =
new ConstructingObjectParser<>("cluster_health_response", true,
parsedObjects -> {
int i = 0;
// ClusterStateHealth fields
int numberOfNodes = (int) parsedObjects[i++];
int numberOfDataNodes = (int) parsedObjects[i++];
int activeShards = (int) parsedObjects[i++];
int relocatingShards = (int) parsedObjects[i++];
int activePrimaryShards = (int) parsedObjects[i++];
int initializingShards = (int) parsedObjects[i++];
int unassignedShards = (int) parsedObjects[i++];
double activeShardsPercent = (double) parsedObjects[i++];
String statusStr = (String) parsedObjects[i++];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
@SuppressWarnings("unchecked") List<ClusterIndexHealth> indexList =
(List<ClusterIndexHealth>) parsedObjects[i++];
final Map<String, ClusterIndexHealth> indices;
if (indexList == null || indexList.isEmpty()) {
indices = emptyMap();
} else {
indices = new HashMap<>(indexList.size());
for (ClusterIndexHealth indexHealth : indexList) {
indices.put(indexHealth.getIndex(), indexHealth);
/*ClusterStateHealth stateHealth = new ClusterStateHealth(activePrimaryShards, activeShards, relocatingShards,
initializingShards, unassignedShards, numberOfNodes, numberOfDataNodes, activeShardsPercent, status,
//ClusterState clusterState = new ClusterState();
//ClusterStateHealth clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
// ClusterHealthResponse fields
String clusterName = (String) parsedObjects[i++];
int numberOfPendingTasks = (int) parsedObjects[i++];
int numberOfInFlightFetch = (int) parsedObjects[i++];
int delayedUnassignedShards = (int) parsedObjects[i++];
long taskMaxWaitingTimeMillis = (long) parsedObjects[i++];
boolean timedOut = (boolean) parsedObjects[i];
return new ClusterHealthResponse(clusterName, null, null, numberOfPendingTasks,
numberOfInFlightFetch, delayedUnassignedShards,
/*return new ClusterHealthResponse(clusterName, numberOfPendingTasks, numberOfInFlightFetch,
TimeValue.timeValueMillis(taskMaxWaitingTimeMillis), timedOut, stateHealth);*/
// private static final ObjectParser.NamedObjectParser<ClusterIndexHealth, Void> INDEX_PARSER =
// (XContentParser parser, Void context, String index) -> ClusterIndexHealth.innerFromXContent(parser, index);
static {
// ClusterStateHealth fields
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_NODES));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_DATA_NODES));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
PARSER.declareDouble(constructorArg(), new ParseField(ACTIVE_SHARDS_PERCENT_AS_NUMBER));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
// Can be absent if LEVEL == 'cluster'
//PARSER.declareNamedObjects(optionalConstructorArg(), INDEX_PARSER, new ParseField(INDICES));
// ClusterHealthResponse fields
PARSER.declareString(constructorArg(), new ParseField(CLUSTER_NAME));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_PENDING_TASKS));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_IN_FLIGHT_FETCH));
PARSER.declareInt(constructorArg(), new ParseField(DELAYED_UNASSIGNED_SHARDS));
PARSER.declareLong(constructorArg(), new ParseField(TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS));
PARSER.declareBoolean(constructorArg(), new ParseField(TIMED_OUT));

@ -1,8 +1,13 @@
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedFunction;
@ -16,8 +21,8 @@ import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;
import org.xbib.elasticsearch.client.http.HttpAction;
import org.xbib.elasticsearch.client.http.HttpActionContext;
import org.xbib.elx.http.HttpAction;
import org.xbib.elx.http.HttpActionContext;
import org.xbib.netty.http.client.RequestBuilder;
@ -106,11 +111,11 @@ public class HttpNodesInfoAction extends HttpAction<NodesInfoRequest, NodesInfoR
String nodeId = entry.getKey();
String ephemeralId = null;
Map<String,Object> map2 = (Map<String, Object>) entry.getValue();
String nodeName = (String)map2.get("name");
String hostName = (String)map2.get("host");
String hostAddress = (String)map2.get("ip");
String nodeName = (String) map2.get("name");
String hostName = (String) map2.get("host");
String hostAddress = (String) map2.get("ip");
// <host>[/<ip>][:<port>]
String transportAddressString = (String)map2.get("transport_address");
String transportAddressString = (String) map2.get("transport_address");
int pos = transportAddressString.indexOf(':');
String host = pos > 0 ? transportAddressString.substring(0, pos) : transportAddressString;
int port = Integer.parseInt(pos > 0 ? transportAddressString.substring(pos + 1) : "0");
@ -121,8 +126,8 @@ public class HttpNodesInfoAction extends HttpAction<NodesInfoRequest, NodesInfoR
TransportAddress transportAddress = new TransportAddress(inetAddresses[0], port);
Build build = new Build(Build.Flavor.OSS, Build.Type.TAR,
(String) map2.get("build"),
(String) map2.get("date"),
(Boolean) map2.get("snapshot"));
Map<String, String> attributes = Collections.emptyMap();
Set<DiscoveryNode.Role> roles = new HashSet<>();
Version version = Version.fromString((String) map2.get("version"));

@ -1,11 +1,14 @@
package org.elasticsearch.action.admin.cluster.settings;
package org.xbib.elx.http.action.admin.cluster.settings;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.xbib.elasticsearch.client.http.HttpAction;
import org.xbib.elx.http.HttpAction;
import org.xbib.netty.http.client.RequestBuilder;
@ -13,9 +16,6 @@ import;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class HttpClusterUpdateSettingsAction extends HttpAction<ClusterUpdateSettingsRequest, ClusterUpdateSettingsResponse> {
@ -41,9 +41,6 @@ public class HttpClusterUpdateSettingsAction extends HttpAction<ClusterUpdateSet
protected CheckedFunction<XContentParser, ClusterUpdateSettingsResponse, IOException> entityParser() {
return parser -> {
// TODO(jprante)
return new ClusterUpdateSettingsResponse();
return ClusterUpdateSettingsResponse::fromXContent;

Some files were not shown because too many files have changed in this diff Show more