align with es221

This commit is contained in:
Jörg Prante 2021-04-28 17:52:41 +02:00
parent 9c606eebdd
commit b7d3184200
12 changed files with 30 additions and 28 deletions

View file

@ -8,6 +8,8 @@ import java.util.concurrent.TimeUnit;
public interface BasicClient extends Closeable { public interface BasicClient extends Closeable {
void init(Settings settings);
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit); void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);
/** /**
@ -23,8 +25,6 @@ public interface BasicClient extends Closeable {
*/ */
ElasticsearchClient getClient(); ElasticsearchClient getClient();
void init(Settings settings);
/** /**
* Get cluster name. * Get cluster name.
* @return the cluster name * @return the cluster name

View file

@ -65,7 +65,7 @@ public interface BulkClient extends BasicClient, Flushable {
* Delete request. * Delete request.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @return this * @return this
*/ */
BulkClient delete(IndexDefinition indexDefinition, String id); BulkClient delete(IndexDefinition indexDefinition, String id);
@ -85,7 +85,7 @@ public interface BulkClient extends BasicClient, Flushable {
* Note that updates only work correctly when all operations between nodes are synchronized. * Note that updates only work correctly when all operations between nodes are synchronized.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param source the source * @param source the source
* @return this * @return this
*/ */
@ -95,7 +95,7 @@ public interface BulkClient extends BasicClient, Flushable {
* Update document. Use with precaution! Does not work in all cases. * Update document. Use with precaution! Does not work in all cases.
* *
* @param indexDefinition the index definition * @param indexDefinition the index definition
* @param id the id * @param id the id
* @param source the source * @param source the source
* @return this * @return this
*/ */

View file

@ -131,6 +131,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
if (isIndexDefinitionDisabled(indexDefinition)) { if (isIndexDefinitionDisabled(indexDefinition)) {
return -1; return -1;
} }
ensureClientIsPresent();
String index = indexDefinition.getFullIndexName(); String index = indexDefinition.getFullIndexName();
GetSettingsRequest request = new GetSettingsRequest().indices(index); GetSettingsRequest request = new GetSettingsRequest().indices(index);
GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet(); GetSettingsResponse response = client.execute(GetSettingsAction.INSTANCE, request).actionGet();
@ -168,6 +169,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
if (index == null) { if (index == null) {
return Collections.emptyMap(); return Collections.emptyMap();
} }
ensureClientIsPresent();
GetAliasesRequest getAliasesRequest = new GetAliasesRequest().indices(index); GetAliasesRequest getAliasesRequest = new GetAliasesRequest().indices(index);
return getFilters(client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet()); return getFilters(client.execute(GetAliasesAction.INSTANCE, getAliasesRequest).actionGet());
} }
@ -331,7 +333,7 @@ public abstract class AbstractAdminClient extends AbstractBasicClient implements
} }
} }
if (candidateIndices.isEmpty()) { if (candidateIndices.isEmpty()) {
logger.info("no candidates found"); logger.info("no candidates found");
return new EmptyPruneResult(); return new EmptyPruneResult();
} }
if (mintokeep > 0 && candidateIndices.size() <= mintokeep) { if (mintokeep > 0 && candidateIndices.size() <= mintokeep) {

View file

@ -71,9 +71,9 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override @Override
public void init(Settings settings) { public void init(Settings settings) {
this.settings = settings;
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(',')); logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
this.settings = settings;
setClient(createClient(settings)); setClient(createClient(settings));
} }
} }
@ -176,6 +176,7 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override @Override
public boolean isIndexExists(IndexDefinition indexDefinition) { public boolean isIndexExists(IndexDefinition indexDefinition) {
ensureClientIsPresent();
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(); IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest();
indicesExistsRequest.indices(indexDefinition.getFullIndexName()); indicesExistsRequest.indices(indexDefinition.getFullIndexName());
IndicesExistsResponse indicesExistsResponse = IndicesExistsResponse indicesExistsResponse =
@ -185,18 +186,17 @@ public abstract class AbstractBasicClient implements BasicClient {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
ensureClientIsPresent();
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
closeClient(settings);
if (scheduler != null) { if (scheduler != null) {
scheduler.shutdown(); scheduler.shutdown();
} }
closeClient(settings);
} }
} }
protected abstract ElasticsearchClient createClient(Settings settings); protected abstract ElasticsearchClient createClient(Settings settings);
protected abstract void closeClient(Settings settings) throws IOException; protected abstract void closeClient(Settings settings);
protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) { protected void updateIndexSetting(String index, String key, Object value, long timeout, TimeUnit timeUnit) {
ensureClientIsPresent(); ensureClientIsPresent();

View file

@ -191,8 +191,10 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override @Override
public BulkClient index(IndexRequest indexRequest) { public BulkClient index(IndexRequest indexRequest) {
ensureClientIsPresent(); if (bulkProcessor != null) {
bulkProcessor.add(indexRequest); ensureClientIsPresent();
bulkProcessor.add(indexRequest);
}
return this; return this;
} }

View file

@ -94,7 +94,6 @@ public class DefaultIndexDefinition implements IndexDefinition {
if (settings.get("settings") != null && settings.get("mapping") != null) { if (settings.get("settings") != null && settings.get("mapping") != null) {
setSettings(findSettingsFrom(settings.get("settings"))); setSettings(findSettingsFrom(settings.get("settings")));
setMappings(findMappingsFrom(settings.get("mapping"))); setMappings(findMappingsFrom(settings.get("mapping")));
setReplicaCount(settings.getAsInt("replica", 0));
boolean shift = settings.getAsBoolean("shift", false); boolean shift = settings.getAsBoolean("shift", false);
setShift(shift); setShift(shift);
if (shift) { if (shift) {

View file

@ -9,7 +9,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.xbib.elx.common.AbstractBulkClient; import org.xbib.elx.common.AbstractBulkClient;
import java.io.IOException;
/** /**
* Elasticsearch HTTP bulk client. * Elasticsearch HTTP bulk client.
@ -35,7 +34,7 @@ public class HttpBulkClient extends AbstractBulkClient implements ElasticsearchC
} }
@Override @Override
protected void closeClient(Settings settings) throws IOException { protected void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }

View file

@ -9,7 +9,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.xbib.elx.common.AbstractSearchClient; import org.xbib.elx.common.AbstractSearchClient;
import java.io.IOException;
/** /**
* Elasticsearch HTTP search client. * Elasticsearch HTTP search client.
@ -35,7 +34,7 @@ public class HttpSearchClient extends AbstractSearchClient implements Elasticsea
} }
@Override @Override
protected void closeClient(Settings settings) throws IOException { protected void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }

View file

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractAdminClient; import org.xbib.elx.common.AbstractAdminClient;
import java.io.IOException;
public class NodeAdminClient extends AbstractAdminClient { public class NodeAdminClient extends AbstractAdminClient {
@ -20,7 +19,7 @@ public class NodeAdminClient extends AbstractAdminClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }

View file

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractBulkClient; import org.xbib.elx.common.AbstractBulkClient;
import java.io.IOException;
public class NodeBulkClient extends AbstractBulkClient { public class NodeBulkClient extends AbstractBulkClient {
@ -20,7 +19,7 @@ public class NodeBulkClient extends AbstractBulkClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }

View file

@ -40,11 +40,15 @@ public class NodeClientHelper {
key -> innerCreateClient(settings)); key -> innerCreateClient(settings));
} }
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
ElasticsearchClient client = clientMap.remove(settings.get("cluster.name")); clientMap.remove(settings.get("cluster.name"));
if (client != null) { logger.debug("closing node...");
logger.debug("closing node..."); if (node != null) {
node.close(); try {
node.close();
} catch (IOException e) {
logger.log(Level.WARN, e.getMessage(), e);
}
node = null; node = null;
} }
} }

View file

@ -3,7 +3,6 @@ package org.xbib.elx.node;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.xbib.elx.common.AbstractSearchClient; import org.xbib.elx.common.AbstractSearchClient;
import java.io.IOException;
public class NodeSearchClient extends AbstractSearchClient { public class NodeSearchClient extends AbstractSearchClient {
@ -20,7 +19,7 @@ public class NodeSearchClient extends AbstractSearchClient {
} }
@Override @Override
public void closeClient(Settings settings) throws IOException { public void closeClient(Settings settings) {
helper.closeClient(settings); helper.closeClient(settings);
} }
} }