update to Netty 4.1.65, clean up dependencies (drop SSL deps), clean up close flag, show netty version info

This commit is contained in:
Jörg Prante 2021-05-22 21:00:20 +02:00
parent 6f17677a11
commit 51d34952d0
26 changed files with 119 additions and 104 deletions

View file

@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit;
public interface BasicClient extends Closeable {
void init(Settings settings);
boolean init(Settings settings, String info);
void putClusterSetting(String key, Object value, long timeout, TimeUnit timeUnit);

View file

@ -30,4 +30,6 @@ public interface BulkProcessor extends Closeable, Flushable {
void setMaxBulkVolume(long bulkSize);
long getMaxBulkVolume();
boolean isClosed();
}

View file

@ -30,4 +30,6 @@ public interface SearchMetric extends Closeable {
void start();
void stop();
boolean isClosed();
}

View file

@ -2,6 +2,5 @@ dependencies {
api project(':elx-api')
implementation "org.xbib:time:${rootProject.property('xbib-time.version')}"
testImplementation "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
testImplementation "io.netty:netty-codec-http:${project.property('netty.version')}"
testImplementation "io.netty:netty-transport:${project.property('netty.version')}"
testImplementation "org.xbib:netty-http-client:${project.property('xbib-netty-http.version')}"
}

View file

@ -4,6 +4,7 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -44,11 +45,10 @@ public abstract class AbstractBasicClient implements BasicClient {
private final ScheduledExecutorService executorService;
private final AtomicBoolean closed;
protected final AtomicBoolean closed;
public AbstractBasicClient() {
this.executorService = Executors.newScheduledThreadPool(2,
new DaemonThreadFactory("elx"));
this.executorService = Executors.newScheduledThreadPool(2, new DaemonThreadFactory("elx"));
closed = new AtomicBoolean(false);
}
@ -68,18 +68,31 @@ public abstract class AbstractBasicClient implements BasicClient {
}
@Override
public void init(Settings settings) {
this.settings = settings;
public boolean init(Settings settings, String infoString) {
if (closed.compareAndSet(false, true)) {
this.settings = settings;
logger.log(Level.INFO, String.format("Elx: %s on %s %s %s Java: %s %s %s %s ES: %s %s",
System.getProperty("user.name"),
System.getProperty("os.name"),
System.getProperty("os.arch"),
System.getProperty("os.version"),
System.getProperty("java.version"),
System.getProperty("java.vm.version"),
System.getProperty("java.vm.vendor"),
System.getProperty("java.vm.name"),
Version.CURRENT,
infoString));
logger.log(Level.INFO, "initializing with settings = " + settings.toDelimitedString(','));
setClient(createClient(settings));
return true;
}
return false;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
if (executorService != null) {
if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
closeClient(settings);

View file

@ -26,7 +26,6 @@ import org.xbib.elx.api.IndexDefinition;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.xbib.elx.api.IndexDefinition.TYPE_NAME;
@ -36,19 +35,17 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
private BulkProcessor bulkProcessor;
private final AtomicBoolean closed;
public AbstractBulkClient() {
super();
closed = new AtomicBoolean(true);
}
@Override
public void init(Settings settings) {
if (closed.compareAndSet(true, false)) {
super.init(settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, info)) {
bulkProcessor = new DefaultBulkProcessor(this, settings);
return true;
}
return false;
}
@Override
@ -65,15 +62,12 @@ public abstract class AbstractBulkClient extends AbstractBasicClient implements
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
if (!bulkProcessor.isClosed()) {
logger.info("closing bulk processor");
ensureClientIsPresent();
if (bulkProcessor != null) {
logger.info("closing bulk processor");
bulkProcessor.close();
}
closeClient(settings);
super.close();
bulkProcessor.close();
}
super.close();
}
@Override

View file

@ -34,34 +34,30 @@ import java.util.stream.StreamSupport;
public abstract class AbstractSearchClient extends AbstractBasicClient implements SearchClient {
private final AtomicBoolean closed;
private SearchMetric searchMetric;
public AbstractSearchClient() {
super();
this.closed = new AtomicBoolean(true);
}
@Override
public void init(Settings settings) {
if (closed.compareAndSet(true, false)) {
super.init(settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, info)) {
if (settings.getAsBoolean(Parameters.SEARCH_METRIC_ENABLED.getName(),
Parameters.SEARCH_METRIC_ENABLED.getBoolean())) {
this.searchMetric = new DefaultSearchMetric(this, settings);
searchMetric.init(settings);
}
return true;
}
return false;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
super.close();
if (searchMetric != null) {
searchMetric.close();
}
super.close();
if (!searchMetric.isClosed()) {
searchMetric.close();
}
}

View file

@ -14,7 +14,6 @@ import org.xbib.elx.api.BasicClient;
import org.xbib.elx.api.SearchClientProvider;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
@ -146,13 +145,12 @@ public class ClientBuilder {
@SuppressWarnings("unchecked")
public <C extends BasicClient> C build() throws IOException {
Settings settings = settingsBuilder.build();
logger.log(Level.INFO, "settings = " + settings.toDelimitedString(','));
if (adminClientProvider != null) {
for (AdminClientProvider provider : ServiceLoader.load(AdminClientProvider.class, classLoader)) {
if (provider.getClass().isAssignableFrom(adminClientProvider)) {
C c = (C) provider.getClient();
c.setClient(client);
c.init(settings);
c.init(settings, null);
return c;
}
}
@ -162,7 +160,7 @@ public class ClientBuilder {
if (provider.getClass().isAssignableFrom(bulkClientProvider)) {
C c = (C) provider.getClient();
c.setClient(client);
c.init(settings);
c.init(settings, null);
return c;
}
}
@ -172,7 +170,7 @@ public class ClientBuilder {
if (provider.getClass().isAssignableFrom(searchClientProvider)) {
C c = (C) provider.getClient();
c.setClient(client);
c.init(settings);
c.init(settings, null);
return c;
}
}

View file

@ -60,6 +60,8 @@ public class DefaultBulkProcessor implements BulkProcessor {
public DefaultBulkProcessor(BulkClient bulkClient, Settings settings) {
this.bulkClient = bulkClient;
this.closed = new AtomicBoolean(false);
this.enabled = new AtomicBoolean(false);
int maxActionsPerRequest = settings.getAsInt(Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getName(),
Parameters.BULK_MAX_ACTIONS_PER_REQUEST.getInteger());
String flushIntervalStr = settings.get(Parameters.BULK_FLUSH_INTERVAL.getName(),
@ -82,17 +84,13 @@ public class DefaultBulkProcessor implements BulkProcessor {
this.bulkVolume = maxVolumePerRequest.getBytes();
}
this.bulkRequest = new BulkRequest();
this.closed = new AtomicBoolean(false);
this.enabled = new AtomicBoolean(false);
this.executionIdGen = new AtomicLong();
this.permits = settings.getAsInt(Parameters.BULK_PERMITS.getName(), Parameters.BULK_PERMITS.getInteger());
if (permits < 1) {
throw new IllegalArgumentException("must not be less 1 permits for bulk indexing");
}
this.semaphore = new ResizeableSemaphore(permits);
if (logger.isInfoEnabled()) {
logger.info("bulk processor now active");
}
logger.info("bulk processor now enabled");
setEnabled(true);
}
@ -121,6 +119,11 @@ public class DefaultBulkProcessor implements BulkProcessor {
return bulkVolume;
}
@Override
public boolean isClosed() {
return closed.get();
}
@Override
public ScheduledExecutorService getScheduler() {
return bulkClient.getScheduler();

View file

@ -12,6 +12,7 @@ import org.xbib.metrics.common.CountMetric;
import org.xbib.metrics.common.Meter;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class DefaultSearchMetric implements SearchMetric {
@ -37,8 +38,11 @@ public class DefaultSearchMetric implements SearchMetric {
private Long stopped;
private final AtomicBoolean closed;
public DefaultSearchMetric(SearchClient searchClient,
Settings settings) {
this.closed = new AtomicBoolean(true);
totalQuery = new Meter(searchClient.getScheduler());
currentQuery = new CountMetric();
queries = new CountMetric();
@ -55,7 +59,9 @@ public class DefaultSearchMetric implements SearchMetric {
@Override
public void init(Settings settings) {
start();
if (closed.compareAndSet(true, false)) {
start();
}
}
@Override
@ -117,10 +123,17 @@ public class DefaultSearchMetric implements SearchMetric {
this.future.cancel(true);
}
@Override
public boolean isClosed() {
return closed.get();
}
@Override
public void close() {
stop();
totalQuery.shutdown();
if (closed.compareAndSet(false, true)) {
stop();
totalQuery.shutdown();
}
}
private void log() {

View file

@ -14,7 +14,8 @@ public class MockAdminClient extends AbstractAdminClient {
}
@Override
public void init(Settings settings) {
public boolean init(Settings settings, String info) {
return true;
}
@Override

View file

@ -18,7 +18,8 @@ public class MockBulkClient extends AbstractBulkClient {
}
@Override
public void init(Settings settings) {
public boolean init(Settings settings, String info) {
return true;
}
@Override

View file

@ -14,7 +14,8 @@ public class MockSearchClient extends AbstractSearchClient {
}
@Override
public void init(Settings settings) {
public boolean init(Settings settings, String info) {
return true;
}
@Override

View file

@ -2,6 +2,4 @@ dependencies{
api project(':elx-common')
api "org.xbib:netty-http-client:${project.property('xbib-netty-http.version')}"
api "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
runtimeOnly "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
runtimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
}

View file

@ -9,7 +9,6 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.xbib.elx.common.AbstractAdminClient;
import java.io.IOException;
/**
* Elasticsearch HTTP admin client.
@ -24,9 +23,12 @@ public class HttpAdminClient extends AbstractAdminClient implements Elasticsearc
}
@Override
public void init(Settings settings) {
super.init(settings);
helper.init(settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, "Netty: " + io.netty.util.Version.identify())) {
helper.init(settings);
return true;
}
return false;
}
@Override

View file

@ -23,9 +23,12 @@ public class HttpBulkClient extends AbstractBulkClient implements ElasticsearchC
}
@Override
public void init(Settings settings) {
super.init(settings);
helper.init(settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, io.netty.util.Version.identify().toString())) {
helper.init(settings);
return true;
}
return false;
}
@Override

View file

@ -23,9 +23,12 @@ public class HttpSearchClient extends AbstractSearchClient implements Elasticsea
}
@Override
public void init(Settings settings) {
super.init(settings);
helper.init(settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, "Netty: " + io.netty.util.Version.identify())) {
helper.init(settings);
return true;
}
return false;
}
@Override

View file

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout pattern="[%d{ISO8601}][%-5p][%-25c][%t] %m%n"/>
</Console>
</appenders>

View file

@ -1,6 +1,4 @@
dependencies {
api project(':elx-common')
api "org.elasticsearch.plugin:transport-netty4-client:${project.property('elasticsearch.version')}"
api "io.netty:netty-codec-http:${project.property('netty.version')}"
api "io.netty:netty-transport:${project.property('netty.version')}"
}

View file

@ -1,6 +1,4 @@
dependencies {
api project(':elx-common')
api "org.elasticsearch.plugin:transport-netty4-client:${rootProject.property('elasticsearch.version')}"
api "io.netty:netty-codec-http:${project.property('netty.version')}"
api "io.netty:netty-transport:${project.property('netty.version')}"
}

View file

@ -23,9 +23,12 @@ public class TransportAdminClient extends AbstractAdminClient {
}
@Override
public void init(Settings settings) {
super.init(settings);
helper.init((TransportClient) getClient(), settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, "Netty: " + io.netty.util.Version.identify())) {
helper.init((TransportClient) getClient(), settings);
return true;
}
return false;
}
@Override

View file

@ -23,9 +23,12 @@ public class TransportBulkClient extends AbstractBulkClient {
}
@Override
public void init(Settings settings) {
super.init(settings);
helper.init((TransportClient) getClient(), settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, "Netty: " + io.netty.util.Version.identify())) {
helper.init((TransportClient) getClient(), settings);
return true;
}
return false;
}
@Override

View file

@ -23,9 +23,12 @@ public class TransportSearchClient extends AbstractSearchClient {
}
@Override
public void init(Settings settings) {
super.init(settings);
helper.init((TransportClient) getClient(), settings);
public boolean init(Settings settings, String info) {
if (super.init(settings, "Netty: " + io.netty.util.Version.identify())) {
helper.init((TransportClient) getClient(), settings);
return true;
}
return false;
}
@Override

View file

@ -163,8 +163,8 @@ public class TestExtension implements ParameterResolver, BeforeEachCallback, Aft
.put(Parameters.PORT.getName(), port)
.put(Parameters.CLUSTER_TARGET_HEALTH.getName(), "YELLOW")
.put(Parameters.CLUSTER_TARGET_HEALTH_TIMEOUT.getName(), "1m")
.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.FALSE)
.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.FALSE)
.put(Parameters.BULK_METRIC_ENABLED.getName(), Boolean.TRUE)
.put(Parameters.SEARCH_METRIC_ENABLED.getName(), Boolean.TRUE)
.build();
}

View file

@ -5,11 +5,11 @@ version = 7.10.2.11
gradle.wrapper.version = 6.6.1
xbib-metrics.version = 2.1.0
xbib-time.version = 2.1.0
xbib-netty-http.version = 4.1.65.0
elasticsearch.version = 7.10.2
# ES 7.10.2 uses Jackson 2.10.4
jackson.version = 2.12.3
netty.version = 4.1.65.Final
tcnative.version = 2.0.39.Final
bouncycastle.version = 1.68
log4j.version = 2.14.0
# ES 7.10.2. uses Netty 4.1.49
xbib-netty-http.version = 4.1.65.0
# ES 7.10.2 uses log4j2 2.11.1
log4j.version = 2.14.1
junit.version = 5.7.1

View file

@ -1,5 +1,5 @@
def junitVersion = project.hasProperty('junit.version')?project.property('junit.version'):'5.7.0'
def junitVersion = project.hasProperty('junit.version')?project.property('junit.version'):'5.7.1'
def hamcrestVersion = project.hasProperty('hamcrest.version')?project.property('hamcrest.version'):'2.2'
dependencies {
@ -11,30 +11,11 @@ dependencies {
test {
useJUnitPlatform()
// for Lucene to access jdk.internal.ref and jdk.internal.misc in Java 11+
jvmArgs = [
// gradle default of 512m is too less for ES bulk indexing
'-Xmx2g',
'-Xms2g',
// do we need tricks with G1GC and real memory circuit breaker?
/*'-XX:+UseG1GC',
'-XX:+ParallelRefProcEnabled',
'-XX:MaxGCPauseMillis=50',
'-XX:+UnlockExperimentalVMOptions',
'-XX:+DisableExplicitGC',
'-XX:+AlwaysPreTouch',
'-XX:G1NewSizePercent=30',
'-XX:G1MaxNewSizePercent=40',
'-XX:G1HeapRegionSize=8M',
'-XX:G1ReservePercent=20',
'-XX:G1HeapWastePercent=5',
'-XX:G1MixedGCCountTarget=4',
'-XX:InitiatingHeapOccupancyPercent=15',
'-XX:G1MixedGCLiveThresholdPercent=90',
'-XX:G1RSetUpdatingPauseTimePercent=5',
'-XX:SurvivorRatio=32',
'-XX:+PerfDisableSharedMem',
'-XX:MaxTenuringThreshold=1',*/
// for Lucene to access jdk.internal.ref and jdk.internal.misc in Java 11+
'--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED',
'--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED'