clean up extended client

This commit is contained in:
Jörg Prante 2019-02-19 17:29:51 +01:00
parent c0dfb9a617
commit 4701447d3c
30 changed files with 929 additions and 849 deletions

View file

@ -67,7 +67,7 @@ subprojects {
}
clean {
delete "plugins"
delete "data"
delete "logs"
delete "out"
}

View file

@ -16,4 +16,6 @@ public interface BulkControl {
Map<String, Long> getStartBulkRefreshIntervals();
Map<String, Long> getStopBulkRefreshIntervals();
String getMaxWaitTime();
}

View file

@ -10,17 +10,16 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Interface for providing extended administrative methods for managing and ingesting Elasticsearch.
* Interface for extended managing and indexing methods of an Elasticsearch client.
*/
public interface ExtendedClient {
/**
* Set an Elasticsearch client to extend from it. May be null for TransportClient.
* @param client client
* @return an ELasticsearch client
* @return this client
*/
ExtendedClient setClient(ElasticsearchClient client);
@ -31,16 +30,8 @@ public interface ExtendedClient {
*/
ElasticsearchClient getClient();
ExtendedClient setBulkMetric(BulkMetric bulkMetric);
BulkMetric getBulkMetric();
ExtendedClient setBulkControl(BulkControl bulkControl);
BulkControl getBulkControl();
/**
* Create new Elasticsearch client, wrap an existing Elasticsearch client.
* Initiative the extended client, cerates instances and connect to cluster, if required.
*
* @param settings settings
* @return this client
@ -49,57 +40,91 @@ public interface ExtendedClient {
ExtendedClient init(Settings settings) throws IOException;
/**
* Bulked index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
* Set bulk metric.
* @param bulkMetric the bulk metric
* @return this client
*/
ExtendedClient setBulkMetric(BulkMetric bulkMetric);
/**
* Get bulk metric.
* @return the bulk metric
*/
BulkMetric getBulkMetric();
/**
* Set bulk control.
* @param bulkControl the bulk control
* @return this
*/
ExtendedClient setBulkControl(BulkControl bulkControl);
/**
* Get buulk control.
* @return the bulk control
*/
BulkControl getBulkControl();
/**
* Build index definition from settings.
*
* @param index the index name
* @param settings the settings for the index
* @return index definition
* @throws IOException if settings/mapping URL is invalid/malformed
*/
IndexDefinition buildIndexDefinitionFromSettings(String index, Settings settings) throws IOException;
/**
* Add index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded.
*
* @param index the index
* @param type the type
* @param id the id
* @param create true if document must be created
* @param source the source
* @return this
*/
ExtendedClient index(String index, String type, String id, boolean create, BytesReference source);
ExtendedClient index(String index, String id, boolean create, BytesReference source);
/**
* Index document.
* Index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when limits are exceeded.
*
* @param index the index
* @param type the type
* @param id the id
* @param create true if document is to be created, false otherwise
* @param source the source
* @return this client methods
*/
ExtendedClient index(String index, String type, String id, boolean create, String source);
ExtendedClient index(String index, String id, boolean create, String source);
/**
* Bulked index request. Each request will be added to a queue for bulking requests.
* Index request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
*
* @param indexRequest the index request to add
* @return this ingest
* @return this
*/
ExtendedClient indexRequest(IndexRequest indexRequest);
ExtendedClient index(IndexRequest indexRequest);
/**
* Delete document.
* Delete request.
*
* @param index the index
* @param type the type
* @param id the id
* @return this ingest
* @return this
*/
ExtendedClient delete(String index, String type, String id);
ExtendedClient delete(String index, String id);
/**
* Bulked delete request. Each request will be added to a queue for bulking requests.
* Delete request. Each request will be added to a queue for bulking requests.
* Submitting request will be done when bulk limits are exceeded.
*
* @param deleteRequest the delete request to add
* @return this ingest
* @return this
*/
ExtendedClient deleteRequest(DeleteRequest deleteRequest);
ExtendedClient delete(DeleteRequest deleteRequest);
/**
* Bulked update request. Each request will be added to a queue for bulking requests.
@ -107,23 +132,21 @@ public interface ExtendedClient {
* Note that updates only work correctly when all operations between nodes are synchronized.
*
* @param index the index
* @param type the type
* @param id the id
* @param source the source
* @return this
*/
ExtendedClient update(String index, String type, String id, BytesReference source);
ExtendedClient update(String index, String id, BytesReference source);
/**
* Update document. Use with precaution! Does not work in all cases.
*
* @param index the index
* @param type the type
* @param id the id
* @param source the source
* @return this
*/
ExtendedClient update(String index, String type, String id, String source);
ExtendedClient update(String index, String id, String source);
/**
* Bulked update request. Each request will be added to a queue for bulking requests.
@ -131,207 +154,225 @@ public interface ExtendedClient {
* Note that updates only work correctly when all operations between nodes are synchronized.
*
* @param updateRequest the update request to add
* @return this ingest
* @return this
*/
ExtendedClient updateRequest(UpdateRequest updateRequest);
/**
* Set the maximum number of actions per request.
*
* @param maxActionsPerRequest maximum number of actions per request
* @return this ingest
*/
ExtendedClient maxActionsPerRequest(int maxActionsPerRequest);
/**
* Set the maximum concurent requests.
*
* @param maxConcurentRequests maximum number of concurrent ingest requests
* @return this Ingest
*/
ExtendedClient maxConcurrentRequests(int maxConcurentRequests);
/**
* Set the maximum volume for request before flush.
*
* @param maxVolume maximum volume
* @return this ingest
*/
ExtendedClient maxVolumePerRequest(String maxVolume);
/**
* Set the flush interval for automatic flushing outstanding ingest requests.
*
* @param flushInterval the flush interval, default is 30 seconds
* @return this ingest
*/
ExtendedClient flushIngestInterval(String flushInterval);
/**
* Set mapping.
*
* @param type mapping type
* @param in mapping definition as input stream
* @throws IOException if mapping could not be added
*/
void mapping(String type, InputStream in) throws IOException;
/**
* Set mapping.
*
* @param type mapping type
* @param mapping mapping definition as input stream
* @throws IOException if mapping could not be added
*/
void mapping(String type, String mapping) throws IOException;
/**
* Put mapping.
*
* @param index index
*/
void putMapping(String index);
ExtendedClient update(UpdateRequest updateRequest);
/**
* Create a new index.
*
* @param index index
* @return this ingest
*/
ExtendedClient newIndex(String index);
/**
* Create a new index.
*
* @param index index
* @param type type
* @param settings settings
* @param mappings mappings
* @return this ingest
* @return this
* @throws IOException if new index creation fails
*/
ExtendedClient newIndex(String index, String type, InputStream settings, InputStream mappings) throws IOException;
ExtendedClient newIndex(String index) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param index index
* @param settings settings
* @param mappings mappings
* @return this ingest
*/
ExtendedClient newIndex(String index, Settings settings, Map<String, String> mappings);
/**
* Create new mapping.
*
* @param index index
* @param type index type
* @param mapping mapping
* @return this ingest
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newMapping(String index, String type, Map<String, Object> mapping);
ExtendedClient newIndex(String index, InputStream settings, InputStream mapping) throws IOException;
/**
* Delete index.
* Create a new index.
*
* @param index index
* @return this ingest
* @param settings settings
* @param mapping mapping
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newIndex(String index, Settings settings, String mapping) throws IOException;
/**
* Create a new index.
*
* @param index index
* @param settings settings
* @param mapping mapping
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newIndex(String index, Settings settings, Map<String, Object> mapping) throws IOException;
/**
* Create a new index.
* @param indexDefinition the index definition
* @return this
* @throws IOException if settings/mapping is invalid or index creation fails
*/
ExtendedClient newIndex(IndexDefinition indexDefinition) throws IOException;
/**
* Delete an index.
* @param indexDefinition the index definition
* @return this
*/
ExtendedClient deleteIndex(IndexDefinition indexDefinition);
/**
* Delete an index.
*
* @param index index
* @return this
*/
ExtendedClient deleteIndex(String index);
/**
* Start bulk mode for indexes.
* @param indexDefinition index definition
* @return this
* @throws IOException if bulk could not be started
*/
ExtendedClient startBulk(IndexDefinition indexDefinition) throws IOException;
/**
* Start bulk mode.
*
* @param index index
* @param index index
* @param startRefreshIntervalSeconds refresh interval before bulk
* @param stopRefreshIntervalSeconds refresh interval after bulk
* @return this ingest
* @return this
* @throws IOException if bulk could not be started
*/
ExtendedClient startBulk(String index, long startRefreshIntervalSeconds,
long stopRefreshIntervalSeconds) throws IOException;
/**
* Stop bulk mode.
*
* @param indexDefinition index definition
* @return this
* @throws IOException if bulk could not be startet
*/
ExtendedClient stopBulk(IndexDefinition indexDefinition) throws IOException;
/**
* Stops bulk mode.
*
* @param index index
* @return this Ingest
* @param maxWaitTime maximum wait time
* @return this
* @throws IOException if bulk could not be stopped
*/
ExtendedClient stopBulk(String index) throws IOException;
ExtendedClient stopBulk(String index, String maxWaitTime) throws IOException;
/**
* Flush ingest, move all pending documents to the cluster.
* Flush bulk indexing, move all pending documents to the cluster.
*
* @return this
*/
ExtendedClient flushIngest();
/**
* Wait for all outstanding responses.
*
* @param maxWaitTime maximum wait time
* @return this ingest
* @throws InterruptedException if wait is interrupted
* @throws ExecutionException if execution failed
* Update replica level.
* @param indexDefinition the index definition
* @param level the replica level
* @return this
* @throws IOException if replica setting could not be updated
*/
ExtendedClient waitForResponses(String maxWaitTime) throws InterruptedException, ExecutionException;
/**
* Refresh the index.
*
* @param index index
*/
void refreshIndex(String index);
/**
* Flush the index.
*
* @param index index
*/
void flushIndex(String index);
ExtendedClient updateReplicaLevel(IndexDefinition indexDefinition, int level) throws IOException;
/**
* Update replica level.
*
* @param index index
* @param level the replica level
* @return number of shards after updating replica level
* @throws IOException if replica could not be updated
* @param maxWaitTime maximum wait time
* @return this
* @throws IOException if replica setting could not be updated
*/
int updateReplicaLevel(String index, int level) throws IOException;
ExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) throws IOException;
/**
* Get replica level.
* @param indexDefinition the index name
* @return the replica level of the index
*/
int getReplicaLevel(IndexDefinition indexDefinition);
/**
* Get replica level.
* @param index the index name
* @return the replica level of the index
*/
int getReplicaLevel(String index);
/**
* Refresh the index.
*
* @param index index
* @return this
*/
ExtendedClient refreshIndex(String index);
/**
* Flush the index. The cluster clears cache and completes indexing.
*
* @param index index
* @return this
*/
ExtendedClient flushIndex(String index);
/**
* Force segment merge of an index.
* @param indexDefinition th eindex definition
* @return this
*/
boolean forceMerge(IndexDefinition indexDefinition);
/**
* Force segment merge of an index.
* @param index the index
* @param maxWaitTime maximum wait time
* @return this
*/
boolean forceMerge(String index, String maxWaitTime);
/**
* Wait for all outstanding bulk responses.
*
* @param maxWaitTime maximum wait time
* @return true if wait succeeded, false if wait timed out
*/
boolean waitForResponses(String maxWaitTime);
/**
* Wait for cluster being healthy.
*
* @param healthColor cluster health color to wait for
* @param timeValue time value
* @throws IOException if wait failed
* @param maxWaitTime time value
* @return true if wait succeeded, false if wait timed out
*/
void waitForCluster(String healthColor, String timeValue) throws IOException;
boolean waitForCluster(String healthColor, String maxWaitTime);
/**
* Get current health color.
*
* @param maxWaitTime maximum wait time
* @return the cluster health color
*/
String healthColor();
String getHealthColor(String maxWaitTime);
/**
* Wait for index recovery (after replica change).
*
* @param index index
* @return number of shards found
* @throws IOException if wait failed
* @param maxWaitTime maximum wait time
* @return true if wait succeeded, false if wait timed out
*/
int waitForRecovery(String index) throws IOException;
boolean waitForRecovery(String index, String maxWaitTime);
/**
* Resolve alias.
*
* @param alias the alias
* @return one index name behind the alias or the alias if there is no index
* @return this index name behind the alias or the alias if there is no index
*/
String resolveAlias(String alias);
@ -347,40 +388,72 @@ public interface ExtendedClient {
/**
* Get all alias filters.
*
* @param index index
* @param alias the alias
* @return map of alias filters
*/
Map<String, String> getAliasFilters(String index);
Map<String, String> getAliasFilters(String alias);
/**
* Switch aliases from one index to another.
*
* @param index the index name
* @param concreteIndex the index name with timestamp
* @param extraAliases a list of names that should be set as index aliases
* Get all index filters.
* @param index the index
* @return map of index filters
*/
void switchAliases(String index, String concreteIndex, List<String> extraAliases);
Map<String, String> getIndexFilters(String index);
/**
* Switch aliases from one index to another.
* Switch from one index to another.
* @param indexDefinition the index definition
* @param extraAliases new aliases
* @return this
*/
ExtendedClient switchIndex(IndexDefinition indexDefinition, List<String> extraAliases);
/**
* Switch from one index to another.
* @param indexDefinition the index definition
* @param extraAliases new aliases
* @param indexAliasAdder method to add aliases
* @return this
*/
ExtendedClient switchIndex(IndexDefinition indexDefinition, List<String> extraAliases, IndexAliasAdder indexAliasAdder);
/**
* Switch from one index to another.
*
* @param index the index name
* @param concreteIndex the index name with timestamp
* @param fullIndexName the index name with timestamp
* @param extraAliases a list of names that should be set as index aliases
* @return this
*/
ExtendedClient switchIndex(String index, String fullIndexName, List<String> extraAliases);
/**
* Switch from one index to another.
*
* @param index the index name
* @param fullIndexName the index name with timestamp
* @param extraAliases a list of names that should be set as index aliases
* @param adder an adder method to create alias term queries
* @return this
*/
void switchAliases(String index, String concreteIndex, List<String> extraAliases, IndexAliasAdder adder);
ExtendedClient switchIndex(String index, String fullIndexName, List<String> extraAliases, IndexAliasAdder adder);
/**
* Retention policy for an index. All indices before timestampdiff should be deleted,
* but mintokeep indices must be kept.
* Prune index.
* @param indexDefinition the index definition
*/
void pruneIndex(IndexDefinition indexDefinition);
/**
* Apply retention policy to prune indices. All indices before delta should be deleted,
* but the number of mintokeep indices must be kept.
*
* @param index index name
* @param concreteIndex index name with timestamp
* @param timestampdiff timestamp delta (for index timestamps)
* @param fullIndexName index name with timestamp
* @param delta timestamp delta (for index timestamps)
* @param mintokeep minimum number of indices to keep
*/
void performRetentionPolicy(String index, String concreteIndex, int timestampdiff, int mintokeep);
void pruneIndex(String index, String fullIndexName, int delta, int mintokeep);
/**
* Find the timestamp of the most recently indexed document in the index.
@ -413,7 +486,7 @@ public interface ExtendedClient {
Throwable getThrowable();
/**
* Shutdown the ingesting.
* Shutdown the client.
* @throws IOException if shutdown fails
*/
void shutdown() throws IOException;

View file

@ -1,13 +1,12 @@
package org.xbib.elx.common.management;
package org.xbib.elx.api;
import java.net.MalformedURLException;
import java.net.URL;
public class IndexDefinition {
private String index;
private String type;
private String fullIndexName;
private String dateTimePattern;
@ -28,6 +27,8 @@ public class IndexDefinition {
private IndexRetention indexRetention;
private String maxWaitTime;
public IndexDefinition setIndex(String index) {
this.index = index;
return this;
@ -46,15 +47,11 @@ public class IndexDefinition {
return fullIndexName;
}
public IndexDefinition setType(String type) {
this.type = type;
public IndexDefinition setSettingsUrl(String settingsUrlString) throws MalformedURLException {
this.settingsUrl = settingsUrlString != null ? new URL(settingsUrlString) : null;
return this;
}
public String getType() {
return type;
}
public IndexDefinition setSettingsUrl(URL settingsUrl) {
this.settingsUrl = settingsUrl;
return this;
@ -64,6 +61,11 @@ public class IndexDefinition {
return settingsUrl;
}
public IndexDefinition setMappingsUrl(String mappingsUrlString) throws MalformedURLException {
this.mappingsUrl = mappingsUrlString != null ? new URL(mappingsUrlString) : null;
return this;
}
public IndexDefinition setMappingsUrl(URL mappingsUrl) {
this.mappingsUrl = mappingsUrl;
return this;
@ -136,4 +138,12 @@ public class IndexDefinition {
return indexRetention;
}
public IndexDefinition setMaxWaitTime(String maxWaitTime) {
this.maxWaitTime = maxWaitTime;
return this;
}
public String getMaxWaitTime() {
return maxWaitTime;
}
}

View file

@ -1,4 +1,4 @@
package org.xbib.elx.common.management;
package org.xbib.elx.api;
public class IndexRetention {
@ -6,12 +6,12 @@ public class IndexRetention {
private int minToKeep;
public IndexRetention setTimestampDiff(int timestampDiff) {
public IndexRetention setDelta(int timestampDiff) {
this.timestampDiff = timestampDiff;
return this;
}
public int getTimestampDiff() {
public int getDelta() {
return timestampDiff;
}

View file

@ -38,24 +38,24 @@ public class BulkProcessor implements Closeable {
private final ScheduledFuture<?> scheduledFuture;
private final AtomicLong executionIdGen = new AtomicLong();
private final AtomicLong executionIdGen;
private final BulkRequestHandler bulkRequestHandler;
private BulkRequest bulkRequest;
private volatile boolean closed = false;
private volatile boolean closed;
private BulkProcessor(Client client, Listener listener, String name, int concurrentRequests,
int bulkActions, ByteSizeValue bulkSize, TimeValue flushInterval) {
this.executionIdGen = new AtomicLong();
this.closed = false;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = concurrentRequests == 0 ?
new SyncBulkRequestHandler(client, listener) :
new AsyncBulkRequestHandler(client, listener, concurrentRequests);
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
EsExecutors.daemonThreadFactory(client.settings(),
@ -83,6 +83,7 @@ public class BulkProcessor implements Closeable {
@Override
public void close() {
try {
// 0 = immediate close
awaitClose(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
@ -90,8 +91,27 @@ public class BulkProcessor implements Closeable {
}
/**
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are
* flushed.
* Wait for bulk request handler with flush.
* @param timeout the timeout value
* @param unit the timeout unit
* @return true is method was successful, false if timeout
* @throws InterruptedException if timeout
*/
public boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
}
// flush
if (bulkRequest.numberOfActions() > 0) {
execute();
}
// wait for all bulk responses
return this.bulkRequestHandler.close(timeout, unit);
}
/**
* Closes the processor. Any remaining bulk actions are flushed and then closed. This emthod can only be called
* once as the last action of a bulk processor.
*
* If concurrent requests are not enabled, returns {@code true} immediately.
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then
@ -116,7 +136,7 @@ public class BulkProcessor implements Closeable {
if (bulkRequest.numberOfActions() > 0) {
execute();
}
return this.bulkRequestHandler.awaitClose(timeout, unit);
return this.bulkRequestHandler.close(timeout, unit);
}
/**
@ -257,7 +277,7 @@ public class BulkProcessor implements Closeable {
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private ByteSizeValue bulkSize = new ByteSizeValue(10, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
@ -367,7 +387,7 @@ public class BulkProcessor implements Closeable {
void execute(BulkRequest bulkRequest, long executionId);
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
boolean close(long timeout, TimeUnit unit) throws InterruptedException;
}
@ -398,7 +418,7 @@ public class BulkProcessor implements Closeable {
}
@Override
public boolean awaitClose(long timeout, TimeUnit unit) {
public boolean close(long timeout, TimeUnit unit) {
return true;
}
}
@ -461,7 +481,7 @@ public class BulkProcessor implements Closeable {
}
@Override
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
public boolean close(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(concurrentRequests, timeout, unit)) {
semaphore.release(concurrentRequests);
return true;

View file

@ -6,8 +6,6 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
/**
* Mock client, it does not perform actions on a cluster. Useful for testing or dry runs.
*/
@ -29,52 +27,32 @@ public class MockExtendedClient extends AbstractExtendedClient {
}
@Override
public MockExtendedClient maxActionsPerRequest(int maxActions) {
public MockExtendedClient index(String index, String id, boolean create, String source) {
return this;
}
@Override
public MockExtendedClient maxConcurrentRequests(int maxConcurrentRequests) {
public MockExtendedClient delete(String index, String id) {
return this;
}
@Override
public MockExtendedClient maxVolumePerRequest(String maxVolumePerRequest) {
public MockExtendedClient update(String index, String id, String source) {
return this;
}
@Override
public MockExtendedClient flushIngestInterval(String interval) {
public MockExtendedClient index(IndexRequest indexRequest) {
return this;
}
@Override
public MockExtendedClient index(String index, String type, String id, boolean create, String source) {
public MockExtendedClient delete(DeleteRequest deleteRequest) {
return this;
}
@Override
public MockExtendedClient delete(String index, String type, String id) {
return this;
}
@Override
public MockExtendedClient update(String index, String type, String id, String source) {
return this;
}
@Override
public MockExtendedClient indexRequest(IndexRequest indexRequest) {
return this;
}
@Override
public MockExtendedClient deleteRequest(DeleteRequest deleteRequest) {
return this;
}
@Override
public MockExtendedClient updateRequest(UpdateRequest updateRequest) {
public MockExtendedClient update(UpdateRequest updateRequest) {
return this;
}
@ -83,23 +61,13 @@ public class MockExtendedClient extends AbstractExtendedClient {
return this;
}
@Override
public MockExtendedClient waitForResponses(String timeValue) {
return this;
}
@Override
public MockExtendedClient startBulk(String index, long startRefreshInterval, long stopRefreshIterval) {
return this;
}
@Override
public MockExtendedClient stopBulk(String index) {
return this;
}
@Override
public MockExtendedClient deleteIndex(String index) {
public MockExtendedClient stopBulk(String index, String maxWaitTime) {
return this;
}
@ -109,34 +77,43 @@ public class MockExtendedClient extends AbstractExtendedClient {
}
@Override
public MockExtendedClient newMapping(String index, String type, Map<String, Object> mapping) {
public MockExtendedClient deleteIndex(String index) {
return this;
}
@Override
public void putMapping(String index) {
public MockExtendedClient refreshIndex(String index) {
return this;
}
@Override
public void refreshIndex(String index) {
public MockExtendedClient flushIndex(String index) {
return this;
}
@Override
public void flushIndex(String index) {
public boolean forceMerge(String index, String maxWaitTime) {
return true;
}
@Override
public void waitForCluster(String healthColor, String timeValue) {
public boolean waitForCluster(String healthColor, String timeValue) {
return true;
}
@Override
public int waitForRecovery(String index) {
return -1;
public boolean waitForResponses(String maxWaitTime) {
return true;
}
@Override
public int updateReplicaLevel(String index, int level) {
return -1;
public boolean waitForRecovery(String index, String maxWaitTime) {
return true;
}
@Override
public MockExtendedClient updateReplicaLevel(String index, int level, String maxWaitTime) {
return this;
}
@Override

View file

@ -8,7 +8,7 @@ public enum Parameters {
DEFAULT_MAX_VOLUME_PER_REQUEST("10mb"),
DEFAULT_FLUSH_INTERVAL("30s"),
DEFAULT_FLUSH_INTERVAL(30),
MAX_ACTIONS_PER_REQUEST ("max_actions_per_request"),

View file

@ -11,19 +11,26 @@ import java.util.Set;
*/
public class SimpleBulkControl implements BulkControl {
private final Set<String> indexNames = new HashSet<>();
private final Set<String> indexNames;
private final Map<String, Long> startBulkRefreshIntervals = new HashMap<>();
private final Map<String, Long> startBulkRefreshIntervals;
private final Map<String, Long> stopBulkRefreshIntervals = new HashMap<>();
private final Map<String, Long> stopBulkRefreshIntervals;
private String maxWaitTime;
public SimpleBulkControl() {
indexNames = new HashSet<>();
startBulkRefreshIntervals = new HashMap<>();
stopBulkRefreshIntervals = new HashMap<>();
maxWaitTime = "30s";
}
@Override
public void startBulk(String indexName, long startRefreshInterval, long stopRefreshInterval) {
synchronized (indexNames) {
indexNames.add(indexName);
startBulkRefreshIntervals.put(indexName, startRefreshInterval);
stopBulkRefreshIntervals.put(indexName, stopRefreshInterval);
}
indexNames.add(indexName);
startBulkRefreshIntervals.put(indexName, startRefreshInterval);
stopBulkRefreshIntervals.put(indexName, stopRefreshInterval);
}
@Override
@ -33,9 +40,7 @@ public class SimpleBulkControl implements BulkControl {
@Override
public void finishBulk(String indexName) {
synchronized (indexNames) {
indexNames.remove(indexName);
}
indexNames.remove(indexName);
}
@Override
@ -53,4 +58,9 @@ public class SimpleBulkControl implements BulkControl {
return stopBulkRefreshIntervals;
}
@Override
public String getMaxWaitTime() {
return maxWaitTime;
}
}

View file

@ -1 +0,0 @@
package org.xbib.elx.common.management;

View file

@ -12,11 +12,11 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.Strings;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
@ -32,9 +32,10 @@ public class AliasTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(AliasTest.class.getName());
@Test
public void testAlias() throws IOException {
public void testAlias() {
Client client = client("1");
CreateIndexRequest indexRequest = new CreateIndexRequest("test");
client("1").admin().indices().create(indexRequest).actionGet();
client.admin().indices().create(indexRequest).actionGet();
// put alias
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
String[] indices = new String[]{"test"};
@ -42,11 +43,11 @@ public class AliasTest extends NodeTestUtils {
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases);
indicesAliasesRequest.addAliasAction(aliasAction);
client("1").admin().indices().aliases(indicesAliasesRequest).actionGet();
client.admin().indices().aliases(indicesAliasesRequest).actionGet();
// get alias
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(Strings.EMPTY_ARRAY);
long t0 = System.nanoTime();
GetAliasesResponse getAliasesResponse = client("1").admin().indices().getAliases(getAliasesRequest).actionGet();
GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(getAliasesRequest).actionGet();
long t1 = (System.nanoTime() - t0) / 1000000;
logger.info("{} time(ms) = {}", getAliasesResponse.getAliases(), t1);
assertTrue(t1 >= 0);
@ -54,22 +55,23 @@ public class AliasTest extends NodeTestUtils {
@Test
public void testMostRecentIndex() {
Client client = client("1");
String alias = "test";
CreateIndexRequest indexRequest = new CreateIndexRequest("test20160101");
client("1").admin().indices().create(indexRequest).actionGet();
client.admin().indices().create(indexRequest).actionGet();
indexRequest = new CreateIndexRequest("test20160102");
client("1").admin().indices().create(indexRequest).actionGet();
client.admin().indices().create(indexRequest).actionGet();
indexRequest = new CreateIndexRequest("test20160103");
client("1").admin().indices().create(indexRequest).actionGet();
client.admin().indices().create(indexRequest).actionGet();
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
String[] indices = new String[]{"test20160101", "test20160102", "test20160103"};
String[] aliases = new String[]{alias};
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(AliasAction.Type.ADD, indices, aliases);
indicesAliasesRequest.addAliasAction(aliasAction);
client("1").admin().indices().aliases(indicesAliasesRequest).actionGet();
client.admin().indices().aliases(indicesAliasesRequest).actionGet();
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client("1"),
GetAliasesRequestBuilder getAliasesRequestBuilder = new GetAliasesRequestBuilder(client,
GetAliasesAction.INSTANCE);
GetAliasesResponse getAliasesResponse = getAliasesRequestBuilder.setAliases(alias).execute().actionGet();
Pattern pattern = Pattern.compile("^(.*?)(\\d+)$");

View file

@ -10,10 +10,6 @@ import java.util.Collection;
public class MockNode extends Node {
public MockNode() {
super(Settings.EMPTY);
}
public MockNode(Settings settings) {
super(settings);
}

View file

@ -32,7 +32,7 @@ public class ExtendeNodeDuplicateIDTest extends NodeTestUtils {
try {
client.newIndex("test");
for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");

View file

@ -1,39 +0,0 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class ExtendedNodeClientSingleNodeTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeClientSingleNodeTest.class.getSimpleName());
@Test
public void testSingleDocNodeClient() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.build();
try {
client.newIndex("test");
client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest();
client.waitForResponses("30s");
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
assertEquals(1, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
}
}
}

View file

@ -3,6 +3,7 @@ package org.xbib.elx.node;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -12,6 +13,7 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -44,7 +46,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
}
@Test
public void testSingleDocNodeClient() throws Exception {
public void testSingleDoc() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.put(Parameters.MAX_ACTIONS_PER_REQUEST.name(), MAX_ACTIONS_PER_REQUEST)
@ -52,11 +54,9 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
.build();
try {
client.newIndex("test");
client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest();
client.waitForResponses("30s");
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
@ -70,7 +70,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
}
@Test
public void testNewIndexNodeClient() throws Exception {
public void testNewIndex() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
@ -84,14 +84,14 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
}
@Test
public void testMappingNodeClient() throws Exception {
public void testMapping() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject("test")
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
@ -99,12 +99,12 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
.endObject()
.endObject()
.endObject();
client.mapping("test", builder.string());
client.newIndex("test");
client.newIndex("test", Settings.EMPTY, builder.string());
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test");
GetMappingsResponse getMappingsResponse =
client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc"));
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
@ -113,7 +113,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
}
@Test
public void testRandomDocsNodeClient() throws Exception {
public void testRandomDocs() throws Exception {
long numactions = ACTIONS;
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
@ -123,7 +123,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
try {
client.newIndex("test");
for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
@ -145,7 +145,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
}
@Test
public void testThreadedRandomDocsNodeClient() throws Exception {
public void testThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors();
Long maxActionsPerRequest = MAX_ACTIONS_PER_REQUEST;
final Long actions = ACTIONS;
@ -165,7 +165,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
for (int i = 0; i < maxthreads; i++) {
pool.execute(() -> {
for (int i1 = 0; i1 < actions; i1++) {
client.index("test", "test", null, false,"{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test", null, false,"{ \"name\" : \"" + randomString(32) + "\"}");
}
latch.countDown();
});
@ -184,7 +184,7 @@ public class ExtendedNodeClientTest extends NodeTestUtils {
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.stopBulk("test");
client.stopBulk("test", "30s");
assertEquals(maxthreads * actions, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());

View file

@ -27,23 +27,23 @@ public class ExtendedNodeIndexAliasTest extends NodeTestUtils {
try {
client.newIndex("test1234");
for (int i = 0; i < 1; i++) {
client.index("test1234", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.refreshIndex("test1234");
List<String> simpleAliases = Arrays.asList("a", "b", "c");
client.switchAliases("test", "test1234", simpleAliases);
client.switchIndex("test", "test1234", simpleAliases);
client.newIndex("test5678");
for (int i = 0; i < 1; i++) {
client.index("test5678", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.refreshIndex("test5678");
simpleAliases = Arrays.asList("d", "e", "f");
client.switchAliases("test", "test5678", simpleAliases, (builder, index, alias) ->
client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
Map<String, String> aliases = client.getIndexFilters("test5678");
logger.info("aliases of index test5678 = {}", aliases);

View file

@ -17,6 +17,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -51,14 +52,14 @@ public class ExtendedNodeReplicaTest extends NodeTestUtils {
.build();
try {
client.newIndex("test1", settingsTest1, null)
.newIndex("test2", settingsTest2, null);
client.newIndex("test1", settingsTest1, new HashMap<>())
.newIndex("test2", settingsTest2, new HashMap<>());
client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 1234; i++) {
client.index("test1", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
for (int i = 0; i < 1234; i++) {
client.index("test2", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");

View file

@ -0,0 +1,69 @@
package org.xbib.elx.node;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.SimpleBulkControl;
import org.xbib.elx.common.SimpleBulkMetric;
import org.xbib.elx.api.IndexDefinition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class ExtendedNodeSmokeTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedNodeSmokeTest.class.getSimpleName());
@Test
public void smokeTest() throws Exception {
final ExtendedNodeClient client = ClientBuilder.builder(client("1"))
.provider(ExtendedNodeClientProvider.class)
.build();
try {
client.setBulkControl(new SimpleBulkControl());
client.setBulkMetric(new SimpleBulkMetric());
client.newIndex("test");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest();
client.waitForResponses("30s");
assertEquals(clusterName, client.getClusterName());
client.checkMapping("test");
client.update("test", "1", "{ \"name\" : \"Another name\"}");
client.flushIngest();
client.waitForRecovery("test", "10s");
client.delete("test", "1");
client.deleteIndex("test");
IndexDefinition indexDefinition = client.buildIndexDefinitionFromSettings("test2", Settings.settingsBuilder()
.build());
assertEquals(0, indexDefinition.getReplicaLevel());
client.newIndex(indexDefinition);
client.index(indexDefinition.getFullIndexName(), "1", true, "{ \"name\" : \"Hello World\"}");
client.flushIngest();
client.updateReplicaLevel(indexDefinition, 2);
int replica = client.getReplicaLevel(indexDefinition);
assertEquals(2, replica);
client.deleteIndex(indexDefinition);
assertEquals(0, client.getBulkMetric().getFailed().getCount());
assertEquals(4, client.getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
}
}
}

View file

@ -8,6 +8,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -38,15 +40,15 @@ public class ExtendedNodeUpdateReplicaLevelTest extends NodeTestUtils {
.build();
try {
client.newIndex("replicatest", settings, null);
client.newIndex("replicatest", settings, new HashMap<>());
client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) {
client.index("replicatest", "replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("replicatest",null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel);
assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
client.updateReplicaLevel("replicatest", replicaLevel, "30s");
//assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

View file

@ -47,11 +47,7 @@ public class NodeTestUtils {
private AtomicInteger counter = new AtomicInteger();
private String cluster;
private String host;
private int port;
protected String clusterName;
private static void deleteFiles() throws IOException {
Path directory = Paths.get(getHome() + "/data");
@ -115,7 +111,7 @@ public class NodeTestUtils {
}
protected void setClusterName() {
this.cluster = "test-helper-cluster-"
this.clusterName = "test-helper-cluster-"
+ NetworkUtils.getLocalAddress().getHostName()
+ "-" + System.getProperty("user.name")
+ "-" + counter.incrementAndGet();
@ -123,7 +119,7 @@ public class NodeTestUtils {
protected Settings getNodeSettings() {
return settingsBuilder()
.put("cluster.name", cluster)
.put("cluster.name", clusterName)
.put("cluster.routing.schedule", "50ms")
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.put("discovery.zen.multicast.enabled", true)
@ -171,8 +167,8 @@ public class NodeTestUtils {
.publishAddress();
if (obj instanceof InetSocketTransportAddress) {
InetSocketTransportAddress address = (InetSocketTransportAddress) obj;
host = address.address().getHostName();
port = address.address().getPort();
String host = address.address().getHostName();
int port = address.address().getPort();
}
}

View file

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.jboss.netty.channel.DefaultChannelFuture;
import org.xbib.elx.common.AbstractExtendedClient;
import org.xbib.elx.common.util.NetworkUtils;
@ -36,15 +37,22 @@ public class ExtendedTransportClient extends AbstractExtendedClient {
+ " " + System.getProperty("java.vm.vendor")
+ " " + System.getProperty("java.vm.version")
+ " Elasticsearch " + Version.CURRENT.toString();
logger.info("creating transport client on {} with effective settings {}",
systemIdentifier, settings.getAsMap());
TransportClient.Builder builder = TransportClient.builder()
.settings(Settings.builder()
.put("cluster.name", settings.get("cluster.name"))
.put("processors", settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
.put("client.transport.ignore_cluster_name", true)
.build());
return builder.build();
Settings effectiveSettings = Settings.builder()
// for thread pool size
.put("processors",
settings.getAsInt("processors", Runtime.getRuntime().availableProcessors()))
.put("client.transport.sniff", false) // do not sniff
.put("client.transport.nodes_sampler_interval", "1m") // do not ping
.put("client.transport.ping_timeout", "1m") // wait for unresponsive nodes a very long time before disconnect
.put("client.transport.ignore_cluster_name", true) // connect to any cluster
// custom settings may override defaults
.put(settings)
.build();
logger.info("creating transport client on {} with custom settings {} and effective settings {}",
systemIdentifier, settings.getAsMap(), effectiveSettings.getAsMap());
// we need to disable dead lock check because we may have mixed node/transport clients
DefaultChannelFuture.setUseDeadLockChecker(false);
return TransportClient.builder().settings(effectiveSettings).build();
}
return null;
}

View file

@ -21,11 +21,9 @@ public class ExtendedTransportClientSingleNodeTest extends NodeTestUtils {
.build();
try {
client.newIndex("test");
client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}"); // single doc ingest
client.flushIngest();
client.waitForResponses("30s");
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

View file

@ -2,25 +2,31 @@ package org.xbib.elx.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import org.xbib.elx.common.Parameters;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ExtendedTransportClientTest extends NodeTestUtils {
@ -41,7 +47,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
}
@Test
public void testBulkClient() throws Exception {
public void testClientIndexOp() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
@ -68,7 +74,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
}
@Test
public void testSingleDocBulkClient() throws Exception {
public void testSingleDoc() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
@ -77,11 +83,9 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
.build();
try {
client.newIndex("test");
client.index("test", "test", "1", true, "{ \"name\" : \"Hello World\"}");
client.index("test", "1", true, "{ \"name\" : \"Hello World\"}");
client.flushIngest();
client.waitForResponses("30s");
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
@ -95,7 +99,37 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
}
@Test
public void testRandomDocsBulkClient() throws Exception {
public void testMapping() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.put(getSettings())
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(5))
.build();
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject("doc")
.startObject("properties")
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject()
.endObject();
client.newIndex("test", Settings.EMPTY, builder.string());
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices("test");
GetMappingsResponse getMappingsResponse =
client.getClient().execute(GetMappingsAction.INSTANCE, getMappingsRequest).actionGet();
logger.info("mappings={}", getMappingsResponse.getMappings());
assertTrue(getMappingsResponse.getMappings().get("test").containsKey("doc"));
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
}
@Test
public void testRandomDocs() throws Exception {
long numactions = ACTIONS;
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
@ -106,12 +140,10 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
try {
client.newIndex("test");
for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
} catch (InterruptedException e) {
// ignore
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
@ -125,7 +157,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
}
@Test
public void testThreadedRandomDocsBulkClient() throws Exception {
public void testThreadedRandomDocs() throws Exception {
int maxthreads = Runtime.getRuntime().availableProcessors();
long maxactions = MAX_ACTIONS_PER_REQUEST;
final long maxloop = ACTIONS;
@ -142,7 +174,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
.put(Parameters.FLUSH_INTERVAL.name(), TimeValue.timeValueSeconds(60))
.build();
try {
client.newIndex("test", settingsForIndex, null)
client.newIndex("test", settingsForIndex, new HashMap<>())
.startBulk("test", -1, 1000);
ThreadPoolExecutor pool = EsExecutors.newFixed("bulkclient-test", maxthreads, 30,
EsExecutors.daemonThreadFactory("bulkclient-test"));
@ -150,7 +182,7 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
for (int i = 0; i < maxthreads; i++) {
pool.execute(() -> {
for (int i1 = 0; i1 < maxloop; i1++) {
client.index("test", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test",null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
latch.countDown();
});
@ -164,15 +196,16 @@ public class ExtendedTransportClientTest extends NodeTestUtils {
pool.shutdown();
logger.info("poot shut down");
}
client.stopBulk("test", "30s");
assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.stopBulk("test");
assertEquals(maxthreads * maxloop, client.getBulkMetric().getSucceeded().getCount());
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
// extra search lookup
client.refreshIndex("test");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
// to avoid NPE at org.elasticsearch.action.search.SearchRequest.writeTo(SearchRequest.java:580)

View file

@ -31,7 +31,7 @@ public class ExtendedTransportDuplicateIDTest extends NodeTestUtils {
try {
client.newIndex("test");
for (int i = 0; i < ACTIONS; i++) {
client.index("test", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");

View file

@ -4,7 +4,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
@ -13,8 +12,8 @@ import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Ignore
public class ExtendedTransportIndexAliasTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportIndexAliasTest.class.getSimpleName());
@ -23,43 +22,53 @@ public class ExtendedTransportIndexAliasTest extends NodeTestUtils {
public void testIndexAlias() throws Exception {
final ExtendedTransportClient client = ClientBuilder.builder()
.provider(ExtendedTransportClientProvider.class)
.build();
.put(getSettings()).build();
try {
client.newIndex("test1234");
for (int i = 0; i < 1; i++) {
client.index("test1234", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test1234", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.refreshIndex("test1234");
List<String> simpleAliases = Arrays.asList("a", "b", "c");
client.switchAliases("test", "test1234", simpleAliases);
client.switchIndex("test", "test1234", simpleAliases);
client.newIndex("test5678");
for (int i = 0; i < 1; i++) {
client.index("test5678", "test", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test5678", randomString(1), false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.refreshIndex("test5678");
simpleAliases = Arrays.asList("d", "e", "f");
client.switchAliases("test", "test5678", simpleAliases, (builder, index, alias) ->
client.switchIndex("test", "test5678", simpleAliases, (builder, index, alias) ->
builder.addAlias(index, alias, QueryBuilders.termQuery("my_key", alias)));
Map<String, String> aliases = client.getIndexFilters("test5678");
logger.info("aliases of index test5678 = {}", aliases);
Map<String, String> indexFilters = client.getIndexFilters("test5678");
logger.info("index filters of index test5678 = {}", indexFilters);
assertTrue(indexFilters.containsKey("a"));
assertTrue(indexFilters.containsKey("b"));
assertTrue(indexFilters.containsKey("c"));
assertTrue(indexFilters.containsKey("d"));
assertTrue(indexFilters.containsKey("e"));
aliases = client.getAliasFilters("test");
Map<String, String> aliases = client.getAliasFilters("test");
logger.info("aliases of alias test = {}", aliases);
assertTrue(aliases.containsKey("a"));
assertTrue(aliases.containsKey("b"));
assertTrue(aliases.containsKey("c"));
assertTrue(aliases.containsKey("d"));
assertTrue(aliases.containsKey("e"));
client.waitForResponses("30s");
assertFalse(client.hasThrowable());
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
client.waitForResponses("30s");
client.shutdown();
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
}
}
}

View file

@ -16,15 +16,13 @@ import org.elasticsearch.index.indexing.IndexingStats;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
*
*/
public class ExtendedTransportReplicaTest extends NodeTestUtils {
private static final Logger logger = LogManager.getLogger(ExtendedTransportReplicaTest.class.getSimpleName());
@ -53,29 +51,30 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils {
.build();
try {
client.newIndex("test1", settingsTest1, null)
.newIndex("test2", settingsTest2, null);
client.newIndex("test1", settingsTest1, new HashMap<>())
.newIndex("test2", settingsTest2, new HashMap<>());
client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 1234; i++) {
client.index("test1", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test1", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
for (int i = 0; i < 1234; i++) {
client.index("test2", "test", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("test2", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
client.refreshIndex("test1");
client.refreshIndex("test2");
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {
logger.info("refreshing");
client.refreshIndex("test1");
client.refreshIndex("test2");
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client.getClient(), SearchAction.INSTANCE)
.setIndices("test1", "test2")
.setQuery(matchAllQuery());
long hits = searchRequestBuilder.execute().actionGet().getHits().getTotalHits();
logger.info("query total hits={}", hits);
assertEquals(2468, hits);
// TODO move to api
IndicesStatsRequestBuilder indicesStatsRequestBuilder = new IndicesStatsRequestBuilder(client.getClient(),
IndicesStatsAction.INSTANCE).all();
IndicesStatsResponse response = indicesStatsRequestBuilder.execute().actionGet();
@ -93,16 +92,15 @@ public class ExtendedTransportReplicaTest extends NodeTestUtils {
}
}
try {
client.deleteIndex("test1")
.deleteIndex("test2");
client.deleteIndex("test1").deleteIndex("test2");
} catch (Exception e) {
logger.error("delete index failed, ignored. Reason:", e);
}
client.shutdown();
if (client.hasThrowable()) {
logger.error("error", client.getThrowable());
}
assertFalse(client.hasThrowable());
client.shutdown();
}
}

View file

@ -7,6 +7,8 @@ import org.elasticsearch.common.settings.Settings;
import org.junit.Test;
import org.xbib.elx.common.ClientBuilder;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -37,15 +39,15 @@ public class ExtendedTransportUpdateReplicaLevelTest extends NodeTestUtils {
.build();
try {
client.newIndex("replicatest", settings, null);
client.newIndex("replicatest", settings, new HashMap<>());
client.waitForCluster("GREEN", "30s");
for (int i = 0; i < 12345; i++) {
client.index("replicatest", "replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
client.index("replicatest", null, false, "{ \"name\" : \"" + randomString(32) + "\"}");
}
client.flushIngest();
client.waitForResponses("30s");
shardsAfterReplica = client.updateReplicaLevel("replicatest", replicaLevel);
assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
client.updateReplicaLevel("replicatest", replicaLevel, "30s");
//assertEquals(shardsAfterReplica, numberOfShards * (replicaLevel + 1));
} catch (NoNodeAvailableException e) {
logger.warn("skipping, no node available");
} finally {

View file

@ -1,6 +1,6 @@
group = org.xbib
name = elx
version = 2.2.1.3
version = 2.2.1.4
xbib-metrics.version = 1.1.0
xbib-guice.version = 4.0.4

View file

@ -1,41 +0,0 @@
tasks.withType(FindBugs) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = false
}
}
tasks.withType(Pmd) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
}
}
tasks.withType(Checkstyle) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
}
}
jacocoTestReport {
reports {
xml.enabled true
csv.enabled false
xml.destination "${buildDir}/reports/jacoco-xml"
html.destination "${buildDir}/reports/jacoco-html"
}
}
sonarqube {
properties {
property "sonar.projectName", "${project.group} ${project.name}"
property "sonar.sourceEncoding", "UTF-8"
property "sonar.tests", "src/integration-test/java"
property "sonar.scm.provider", "git"
property "sonar.java.coveragePlugin", "jacoco"
property "sonar.junit.reportsPath", "build/test-results/test/"
}
}