From 6af56268aab4f600675c143f75abfd82a2b362c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 20 Apr 2021 10:29:40 +0200 Subject: [PATCH] align with es221 --- .../java/org/xbib/elx/api/BasicClient.java | 5 +++ .../java/org/xbib/elx/api/BulkClient.java | 11 ++---- .../java/org/xbib/elx/api/BulkController.java | 36 ------------------- .../java/org/xbib/elx/api/BulkListener.java | 3 +- .../java/org/xbib/elx/api/BulkMetric.java | 7 ++-- .../java/org/xbib/elx/api/BulkProcessor.java | 23 ++++++++---- .../org/xbib/elx/api/BulkRequestHandler.java | 11 ------ 7 files changed, 30 insertions(+), 66 deletions(-) delete mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkController.java delete mode 100644 elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java diff --git a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java index 54b8b83..b65d05a 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BasicClient.java @@ -4,10 +4,13 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public interface BasicClient extends Closeable { + void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit) throws IOException; + /** * Set an Elasticsearch client to extend from it. May be null for TransportClient. * @param client the Elasticsearch client @@ -59,4 +62,6 @@ public interface BasicClient extends Closeable { long getSearchableDocs(IndexDefinition indexDefinition); boolean isIndexExists(IndexDefinition indexDefinition); + + ScheduledThreadPoolExecutor getScheduler(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java index 8072981..0ad3aa0 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkClient.java @@ -10,12 +10,6 @@ import java.util.concurrent.TimeUnit; public interface BulkClient extends BasicClient, Flushable { - /** - * Get bulk control. - * @return the bulk control - */ - BulkController getBulkController(); - /** * Create a new index. * @param indexDefinition the index definition @@ -42,8 +36,8 @@ public interface BulkClient extends BasicClient, Flushable { * Add index request. Each request will be added to a queue for bulking requests. * Submitting request will be done when limits are exceeded. * - * @param indexDefinition the index definition - * @param id the id + * @param indexDefinition the index definition + * @param id the id * @param create true if document must be created * @param source the source * @return this @@ -155,4 +149,5 @@ public interface BulkClient extends BasicClient, Flushable { */ void flushIndex(IndexDefinition indexDefinition); + BulkProcessor getBulkController(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java b/elx-api/src/main/java/org/xbib/elx/api/BulkController.java deleted file mode 100644 index daaf68f..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkController.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.xbib.elx.api; - -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.settings.Settings; - -import java.io.Closeable; -import java.io.Flushable; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public interface BulkController extends Closeable, Flushable { - - void init(Settings settings); - - void inactivate(); - - BulkProcessor getBulkProcessor(); - - BulkMetric getBulkMetric(); - - Throwable getLastBulkError(); - - void startBulkMode(IndexDefinition indexDefinition) throws IOException; - - void bulkIndex(IndexRequest indexRequest); - - void bulkDelete(DeleteRequest deleteRequest); - - void bulkUpdate(UpdateRequest updateRequest); - - boolean waitForBulkResponses(long timeout, TimeUnit timeUnit); - - void stopBulkMode(IndexDefinition indexDefinition) throws IOException; -} diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java index f434ed4..362dcb5 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkListener.java @@ -2,11 +2,12 @@ package org.xbib.elx.api; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import java.io.Closeable; /** * A bulk listener for following executions of bulk operations. */ -public interface BulkListener { +public interface BulkListener extends Closeable { /** * Callback before the bulk is executed. diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java index b7d11e0..cb27111 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkMetric.java @@ -1,6 +1,7 @@ package org.xbib.elx.api; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.xbib.metrics.api.Count; import org.xbib.metrics.api.Metered; @@ -8,8 +9,6 @@ import java.io.Closeable; public interface BulkMetric extends Closeable { - void init(Settings settings); - void markTotalIngest(long n); Metered getTotalIngest(); @@ -31,4 +30,6 @@ public interface BulkMetric extends Closeable { void start(); void stop(); + + void recalculate(BulkRequest request, BulkResponse response); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java index fab2a18..a59fd28 100644 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java +++ b/elx-api/src/main/java/org/xbib/elx/api/BulkProcessor.java @@ -4,21 +4,30 @@ import org.elasticsearch.action.DocWriteRequest; import java.io.Closeable; import java.io.Flushable; +import java.io.IOException; import java.util.concurrent.TimeUnit; public interface BulkProcessor extends Closeable, Flushable { - void setBulkActions(int bulkActions); + void setEnabled(boolean enabled); - int getBulkActions(); + void startBulkMode(IndexDefinition indexDefinition) throws IOException; - void setBulkSize(long bulkSize); + void stopBulkMode(IndexDefinition indexDefinition) throws IOException; - long getBulkSize(); + void setMaxBulkActions(int bulkActions); - BulkProcessor add(DocWriteRequest request); + int getMaxBulkActions(); - boolean awaitFlush(long timeout, TimeUnit unit) throws InterruptedException; + void setMaxBulkVolume(long bulkSize); - boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; + long getMaxBulkVolume(); + + void add(DocWriteRequest request); + + boolean waitForBulkResponses(long timeout, TimeUnit unit); + + BulkMetric getBulkMetric(); + + Throwable getLastBulkError(); } diff --git a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java b/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java deleted file mode 100644 index 1bc3886..0000000 --- a/elx-api/src/main/java/org/xbib/elx/api/BulkRequestHandler.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.xbib.elx.api; - -import org.elasticsearch.action.bulk.BulkRequest; -import java.util.concurrent.TimeUnit; - -public interface BulkRequestHandler { - - void execute(BulkRequest bulkRequest, long executionId); - - boolean close(long timeout, TimeUnit unit) throws InterruptedException; -}