diff --git a/build.gradle b/build.gradle index 1e29b59..3779d54 100644 --- a/build.gradle +++ b/build.gradle @@ -2,6 +2,7 @@ import java.time.ZonedDateTime import java.time.format.DateTimeFormatter plugins { + id "com.github.spotbugs" version "1.6.1" id "org.sonarqube" version "2.6.1" id "io.codearte.nexus-staging" version "0.11.0" id "org.xbib.gradle.plugin.asciidoctor" version "1.6.0.0" @@ -23,10 +24,10 @@ printf "Date: %s\nHost: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGradle: %s Groovy: % apply plugin: 'java' apply plugin: 'maven' apply plugin: 'signing' +apply plugin: "com.github.spotbugs" apply plugin: "io.codearte.nexus-staging" apply plugin: 'org.xbib.gradle.plugin.asciidoctor' - configurations { alpnagent asciidoclet @@ -37,7 +38,7 @@ dependencies { compile "org.xbib:net-url:${project.property('xbib-net-url.version')}" compile "io.netty:netty-codec-http2:${project.property('netty.version')}" compile "io.netty:netty-handler-proxy:${project.property('netty.version')}" - testCompile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}" + compile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}" testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}" testCompile "junit:junit:${project.property('junit.version')}" @@ -66,7 +67,7 @@ test { jvmArgs "-javaagent:" + configurations.alpnagent.asPath } testLogging { - showStandardStreams = false + showStandardStreams = true exceptionFormat = 'full' } } @@ -121,5 +122,109 @@ if (project.hasProperty('signing.keyId')) { } } -apply from: 'gradle/ext.gradle' -apply from: 'gradle/publish.gradle' + +spotbugs { + effort = "max" + reportLevel = "low" + //includeFilter = file("findbugs-exclude.xml") +} + +tasks.withType(com.github.spotbugs.SpotBugsTask) { + ignoreFailures = true + reports { + xml.enabled = false + html.enabled = true + } +} + +sonarqube { + properties { + property "sonar.projectName", "${project.group} ${project.name}" + property "sonar.sourceEncoding", "UTF-8" + property "sonar.tests", "src/test/java" + property "sonar.scm.provider", "git" + property "sonar.junit.reportsPath", "build/test-results/test/" + } +} + +ext { + user = 'jprante' + name = 'netty-http-client' + description = 'A java client for Elasticsearch' + scmUrl = 'https://github.com/' + user + '/' + name + scmConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git' + scmDeveloperConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git' +} + + +task xbibUpload(type: Upload) { + group = 'publish' + configuration = configurations.archives + uploadDescriptor = true + repositories { + if (project.hasProperty("xbibUsername")) { + mavenDeployer { + configuration = configurations.wagon + repository(url: 'sftp://xbib.org/repository') { + authentication(userName: xbibUsername, privateKey: xbibPrivateKey) + } + } + } + } +} + +task sonaTypeUpload(type: Upload) { + group = 'publish' + configuration = configurations.archives + uploadDescriptor = true + repositories { + if (project.hasProperty('ossrhUsername')) { + mavenDeployer { + beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) } + repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') { + authentication(userName: ossrhUsername, password: ossrhPassword) + } + snapshotRepository(url: 'https://oss.sonatype.org/content/repositories/snapshots') { + authentication(userName: ossrhUsername, password: ossrhPassword) + } + pom.project { + groupId project.group + artifactId project.name + version project.version + name project.name + description description + packaging 'jar' + inceptionYear '2012' + url scmUrl + organization { + name 'xbib' + url 'http://xbib.org' + } + developers { + developer { + id user + name 'Jörg Prante' + email 'joergprante@gmail.com' + url 'https://github.com/jprante' + } + } + scm { + url scmUrl + connection scmConnection + developerConnection scmDeveloperConnection + } + licenses { + license { + name 'The Apache License, Version 2.0' + url 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + } + } + } + } +} + +nexusStaging { + packageGroup = "org.xbib" +} diff --git a/gradle.properties b/gradle.properties index 8bbb631..d0ddc09 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,8 +1,8 @@ group = org.xbib name = netty-http-client -version = 4.1.16.0 +version = 4.1.22.2 -netty.version = 4.1.16.Final +netty.version = 4.1.22.Final tcnative.version = 2.0.7.Final conscrypt.version = 1.0.1 xbib-net-url.version = 1.1.0 diff --git a/gradle/ext.gradle b/gradle/ext.gradle index dcb8f32..e69de29 100644 --- a/gradle/ext.gradle +++ b/gradle/ext.gradle @@ -1,9 +0,0 @@ - -ext { - user = 'jprante' - name = 'netty-http-client' - description = 'A java client for Elasticsearch' - scmUrl = 'https://github.com/' + user + '/' + name - scmConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git' - scmDeveloperConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git' -} diff --git a/gradle/publish.gradle b/gradle/publish.gradle index 0935c6f..e69de29 100644 --- a/gradle/publish.gradle +++ b/gradle/publish.gradle @@ -1,72 +0,0 @@ - -task xbibUpload(type: Upload) { - group = 'publish' - configuration = configurations.archives - uploadDescriptor = true - repositories { - if (project.hasProperty("xbibUsername")) { - mavenDeployer { - configuration = configurations.wagon - repository(url: 'sftp://xbib.org/repository') { - authentication(userName: xbibUsername, privateKey: xbibPrivateKey) - } - } - } - } -} - -task sonaTypeUpload(type: Upload) { - group = 'publish' - configuration = configurations.archives - uploadDescriptor = true - repositories { - if (project.hasProperty('ossrhUsername')) { - mavenDeployer { - beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) } - repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') { - authentication(userName: ossrhUsername, password: ossrhPassword) - } - snapshotRepository(url: 'https://oss.sonatype.org/content/repositories/snapshots') { - authentication(userName: ossrhUsername, password: ossrhPassword) - } - pom.project { - groupId project.group - artifactId project.name - version project.version - name project.name - description description - packaging 'jar' - inceptionYear '2012' - url scmUrl - organization { - name 'xbib' - url 'http://xbib.org' - } - developers { - developer { - id user - name 'Jörg Prante' - email 'joergprante@gmail.com' - url 'https://github.com/jprante' - } - } - scm { - url scmUrl - connection scmConnection - developerConnection scmDeveloperConnection - } - licenses { - license { - name 'The Apache License, Version 2.0' - url 'http://www.apache.org/licenses/LICENSE-2.0.txt' - } - } - } - } - } - } -} - -nexusStaging { - packageGroup = "org.xbib" -} diff --git a/gradle/sonarqube.gradle b/gradle/sonarqube.gradle index 3985a4f..e69de29 100644 --- a/gradle/sonarqube.gradle +++ b/gradle/sonarqube.gradle @@ -1,39 +0,0 @@ -tasks.withType(FindBugs) { - ignoreFailures = true - reports { - xml.enabled = false - html.enabled = true - } -} -tasks.withType(Pmd) { - ignoreFailures = true - reports { - xml.enabled = true - html.enabled = true - } -} -tasks.withType(Checkstyle) { - ignoreFailures = true - reports { - xml.enabled = true - html.enabled = true - } -} - -jacocoTestReport { - reports { - xml.enabled = true - csv.enabled = false - } -} - -sonarqube { - properties { - property "sonar.projectName", "${project.group} ${project.name}" - property "sonar.sourceEncoding", "UTF-8" - property "sonar.tests", "src/test/java" - property "sonar.scm.provider", "git" - property "sonar.java.coveragePlugin", "jacoco" - property "sonar.junit.reportsPath", "build/test-results/test/" - } -} diff --git a/src/main/java/org/xbib/netty/http/client/Client.java b/src/main/java/org/xbib/netty/http/client/Client.java index 8700e76..e5efefa 100644 --- a/src/main/java/org/xbib/netty/http/client/Client.java +++ b/src/main/java/org/xbib/netty/http/client/Client.java @@ -2,38 +2,54 @@ package org.xbib.netty.http.client; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; +import org.xbib.net.URL; import org.xbib.netty.http.client.handler.http1.HttpChannelInitializer; import org.xbib.netty.http.client.handler.http1.HttpResponseHandler; import org.xbib.netty.http.client.handler.http2.Http2ChannelInitializer; import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler; import org.xbib.netty.http.client.handler.http2.Http2SettingsHandler; -import org.xbib.netty.http.client.pool.Pool; -import org.xbib.netty.http.client.pool.SimpleChannelPool; +import org.xbib.netty.http.client.pool.BoundedChannelPool; import org.xbib.netty.http.client.transport.Http2Transport; import org.xbib.netty.http.client.transport.HttpTransport; import org.xbib.netty.http.client.transport.Transport; import org.xbib.netty.http.client.util.NetworkUtils; +import javax.net.ssl.SNIHostName; +import javax.net.ssl.SNIServerName; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLParameters; import javax.net.ssl.TrustManagerFactory; import java.io.IOException; import java.security.KeyStoreException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; @@ -45,7 +61,22 @@ public final class Client { private static final ThreadFactory httpClientThreadFactory = new HttpClientThreadFactory(); static { - NetworkUtils.extendSystemProperties(); + if (System.getProperty("xbib.netty.http.client.extendsystemproperties") != null) { + NetworkUtils.extendSystemProperties(); + } + // change Netty defaults to safer ones, but still allow override from arg line + if (System.getProperty("io.netty.noUnsafe") == null) { + System.setProperty("io.netty.noUnsafe", Boolean.toString(true)); + } + if (System.getProperty("io.netty.noKeySetOptimization") == null) { + System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true)); + } + if (System.getProperty("io.netty.recycler.maxCapacity") == null) { + System.setProperty("io.netty.recycler.maxCapacity", Integer.toString(0)); + } + if (System.getProperty("io.netty.leakDetection.level") == null) { + System.setProperty("io.netty.leakDetection.level", "advanced"); + } } private final ClientConfig clientConfig; @@ -68,7 +99,7 @@ public final class Client { private TransportListener transportListener; - private Pool pool; + private BoundedChannelPool pool; public Client() { this(new ClientConfig()); @@ -84,40 +115,50 @@ public final class Client { this.clientConfig = clientConfig; initializeTrustManagerFactory(clientConfig); this.byteBufAllocator = byteBufAllocator != null ? - byteBufAllocator : PooledByteBufAllocator.DEFAULT; - this.eventLoopGroup = eventLoopGroup != null ? - eventLoopGroup : new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory); - this.socketChannelClass = socketChannelClass != null ? - socketChannelClass : NioSocketChannel.class; + byteBufAllocator : ByteBufAllocator.DEFAULT; + this.eventLoopGroup = eventLoopGroup != null ? eventLoopGroup : clientConfig.isEpoll() ? + new EpollEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory) : + new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory); + this.socketChannelClass = socketChannelClass != null ? socketChannelClass : clientConfig.isEpoll() ? + EpollSocketChannel.class : NioSocketChannel.class; this.bootstrap = new Bootstrap() .group(this.eventLoopGroup) .channel(this.socketChannelClass) + //.option(ChannelOption.ALLOCATOR, byteBufAllocator) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNodelay()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isKeepAlive()) .option(ChannelOption.SO_REUSEADDR, clientConfig.isReuseAddr()) .option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSendBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getTcpReceiveBufferSize()) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) - .option(ChannelOption.ALLOCATOR, byteBufAllocator); + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, clientConfig.getWriteBufferWaterMark()); this.httpResponseHandler = new HttpResponseHandler(); this.http2SettingsHandler = new Http2SettingsHandler(); this.http2ResponseHandler = new Http2ResponseHandler(); this.transports = new CopyOnWriteArrayList<>(); - List nodes = clientConfig.getNodes(); - if (nodes != null && !nodes.isEmpty()) { - Integer limit = clientConfig.getNodeConnectionLimit(); - if (limit == null || limit > nodes.size()) { - limit = nodes.size(); - } - if (limit < 1) { + if (hasPooledConnections()) { + List nodes = clientConfig.getPoolNodes(); + Integer limit = clientConfig.getPoolNodeConnectionLimit(); + if (limit == null || limit < 1) { limit = 1; } Semaphore semaphore = new Semaphore(limit); - Integer retries = clientConfig.getRetriesPerNode(); + Integer retries = clientConfig.getRetriesPerPoolNode(); if (retries == null || retries < 0) { retries = 0; } - this.pool = new SimpleChannelPool<>(semaphore, nodes, bootstrap, null, retries); + ClientChannelPoolHandler clientChannelPoolHandler = new ClientChannelPoolHandler(); + this.pool = new BoundedChannelPool<>(semaphore, clientConfig.getPoolVersion(), + clientConfig.isPoolSecure(), nodes, bootstrap, clientChannelPoolHandler, retries); + Integer nodeConnectionLimit = clientConfig.getPoolNodeConnectionLimit(); + if (nodeConnectionLimit == null || nodeConnectionLimit == 0) { + nodeConnectionLimit = nodes.size(); + } + try { + this.pool.prepare(nodeConnectionLimit); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } } } @@ -133,52 +174,115 @@ public final class Client { return byteBufAllocator; } + public EventLoopGroup getEventLoopGroup() { + return eventLoopGroup; + } + public void setTransportListener(TransportListener transportListener) { this.transportListener = transportListener; } + public boolean hasPooledConnections() { + return !clientConfig.getPoolNodes().isEmpty(); + } + + public BoundedChannelPool getPool() { + return pool; + } + public void logDiagnostics(Level level) { logger.log(level, () -> "OpenSSL available: " + OpenSsl.isAvailable() + " OpenSSL ALPN support: " + OpenSsl.isAlpnSupported() + - " Local host name: " + NetworkUtils.getLocalHostName("localhost")); + " Local host name: " + NetworkUtils.getLocalHostName("localhost") + + " event loop group: " + eventLoopGroup + + " socket: " + socketChannelClass.getName() + + " allocator: " + byteBufAllocator.getClass().getName()); logger.log(level, NetworkUtils::displayNetworkInterfaces); } - public int getTimeout() { - return clientConfig.getReadTimeoutMillis(); + public Transport newTransport() { + return newTransport(null); + } + + public Transport newTransport(URL url, HttpVersion httpVersion) { + return newTransport(HttpAddress.of(url, httpVersion)); } public Transport newTransport(HttpAddress httpAddress) { - Transport transport; - if (httpAddress.getVersion().majorVersion() < 2) { - transport = new HttpTransport(this, httpAddress); - } else { - transport = new Http2Transport(this, httpAddress); + Transport transport = null; + if (httpAddress != null) { + if (httpAddress.getVersion().majorVersion() == 1) { + transport = new HttpTransport(this, httpAddress); + } else { + transport = new Http2Transport(this, httpAddress); + } + } else if (hasPooledConnections()) { + if (pool.getVersion().majorVersion() == 1) { + transport = new HttpTransport(this, null); + } else { + transport = new Http2Transport(this, null); + } } - if (transportListener != null) { - transportListener.onOpen(transport); + if (transport != null) { + if (transportListener != null) { + transportListener.onOpen(transport); + } + transports.add(transport); } - transports.add(transport); return transport; } - public Channel newChannel(HttpAddress httpAddress) throws InterruptedException { - HttpVersion httpVersion = httpAddress.getVersion(); - ChannelInitializer initializer; - Channel channel; - if (httpVersion.majorVersion() < 2) { - initializer = new HttpChannelInitializer(clientConfig, httpAddress, httpResponseHandler); - channel = bootstrap.handler(initializer) - .connect(httpAddress.getInetSocketAddress()).sync().await().channel(); + public Channel newChannel(HttpAddress httpAddress) throws IOException { + Channel channel = null; + if (httpAddress != null) { + HttpVersion httpVersion = httpAddress.getVersion(); + ChannelInitializer initializer; + SslHandler sslHandler = newSslHandler(clientConfig, byteBufAllocator, httpAddress); + if (httpVersion.majorVersion() == 1) { + initializer = new HttpChannelInitializer(clientConfig, httpAddress, + sslHandler, httpResponseHandler); + } else { + initializer = new Http2ChannelInitializer(clientConfig, httpAddress, + sslHandler, http2SettingsHandler, http2ResponseHandler); + } + try { + channel = bootstrap.handler(initializer) + .connect(httpAddress.getInetSocketAddress()).sync().await().channel(); + } catch (InterruptedException e) { + throw new IOException(e); + } } else { - initializer = new Http2ChannelInitializer(clientConfig, httpAddress, - http2SettingsHandler, http2ResponseHandler); - channel = bootstrap.handler(initializer) - .connect(httpAddress.getInetSocketAddress()).sync().await().channel(); + if (hasPooledConnections()) { + try { + channel = pool.acquire(); + } catch (Exception e) { + throw new IOException(e); + } + } else { + throw new UnsupportedOperationException(); + } } return channel; } + public Channel newChannel() throws IOException { + return newChannel(null); + } + + public void releaseChannel(Channel channel) throws IOException{ + if (hasPooledConnections()) { + try { + pool.release(channel); + } catch (Exception e) { + throw new IOException(e); + } + } else { + if (channel != null) { + channel.close(); + } + } + } + public Transport execute(Request request) throws IOException { Transport transport = newTransport(HttpAddress.of(request)); transport.execute(request); @@ -190,8 +294,14 @@ public final class Client { return newTransport(HttpAddress.of(request)).execute(request, supplier); } + public Transport pooledExecute(Request request) throws IOException { + Transport transport = newTransport(); + transport.execute(request); + return transport; + } + /** - * For following redirects by a chain of transports. + * For following redirects, construct a new transport. * @param transport the previous transport * @param request the new request for continuing the request. */ @@ -203,6 +313,13 @@ public final class Client { close(nextTransport); } + /** + * Retry request by following a back-off strategy. + * + * @param transport the transport to retry + * @param request the request to retry + * @throws IOException if retry failed + */ public void retry(Transport transport, Request request) throws IOException { transport.execute(request); transport.get(); @@ -213,7 +330,7 @@ public final class Client { return newTransport(HttpAddress.of(request)); } - public void close(Transport transport) { + public void close(Transport transport) throws IOException { if (transportListener != null) { transportListener.onClose(transport); } @@ -221,21 +338,29 @@ public final class Client { transports.remove(transport); } - public void close() { + public void close() throws IOException { for (Transport transport : transports) { close(transport); } } - public void shutdown() { - eventLoopGroup.shutdownGracefully(); - } - - public void shutdownGracefully() { + public void shutdownGracefully() throws IOException { + if (hasPooledConnections()) { + pool.close(); + } close(); shutdown(); } + public void shutdown() { + eventLoopGroup.shutdownGracefully(); + try { + eventLoopGroup.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignore + } + } + /** * Initialize trust manager factory once per client lifecycle. * @param clientConfig the client config @@ -251,6 +376,60 @@ public final class Client { } } + private static SslHandler newSslHandler(ClientConfig clientConfig, ByteBufAllocator allocator, HttpAddress httpAddress) { + try { + SslContext sslContext = newSslContext(clientConfig); + SslHandler sslHandler = sslContext.newHandler(allocator); + SSLEngine engine = sslHandler.engine(); + List serverNames = clientConfig.getServerNamesForIdentification(); + if (serverNames.isEmpty()) { + serverNames = Collections.singletonList(httpAddress.getInetSocketAddress().getHostName()); + } + SSLParameters params = engine.getSSLParameters(); + params.setEndpointIdentificationAlgorithm("HTTPS"); + List sniServerNames = new ArrayList<>(); + for (String serverName : serverNames) { + sniServerNames.add(new SNIHostName(serverName)); + } + params.setServerNames(sniServerNames); + engine.setSSLParameters(params); + switch (clientConfig.getClientAuthMode()) { + case NEED: + engine.setNeedClientAuth(true); + break; + case WANT: + engine.setWantClientAuth(true); + break; + default: + break; + } + return sslHandler; + } catch (SSLException e) { + throw new IllegalArgumentException(e); + } + } + + private static SslContext newSslContext(ClientConfig clientConfig) throws SSLException { + SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() + .sslProvider(clientConfig.getSslProvider()) + .ciphers(Http2SecurityUtil.CIPHERS, clientConfig.getCipherSuiteFilter()) + .applicationProtocolConfig(newApplicationProtocolConfig()); + if (clientConfig.getSslContextProvider() != null) { + sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider()); + } + if (clientConfig.getTrustManagerFactory() != null) { + sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory()); + } + return sslContextBuilder.build(); + } + + private static ApplicationProtocolConfig newApplicationProtocolConfig() { + return new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2); + } + public interface TransportListener { void onOpen(Transport transport); @@ -269,4 +448,35 @@ public final class Client { return thread; } } + + class ClientChannelPoolHandler implements ChannelPoolHandler { + + @Override + public void channelReleased(Channel channel) { + } + + @Override + public void channelAcquired(Channel channel) { + } + + @Override + public void channelCreated(Channel channel) { + HttpAddress httpAddress = channel.attr(pool.getAttributeKey()).get(); + HttpVersion httpVersion = httpAddress.getVersion(); + SslHandler sslHandler = newSslHandler(clientConfig, byteBufAllocator, httpAddress); + if (httpVersion.majorVersion() == 1) { + HttpChannelInitializer initializer = new HttpChannelInitializer(clientConfig, httpAddress, + sslHandler, httpResponseHandler); + if (channel instanceof SocketChannel) { + initializer.initChannel((SocketChannel) channel); + } + } else { + Http2ChannelInitializer initializer = new Http2ChannelInitializer(clientConfig, httpAddress, + sslHandler, http2SettingsHandler, http2ResponseHandler); + if (channel instanceof SocketChannel) { + initializer.initChannel((SocketChannel) channel); + } + } + } + } } diff --git a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java index 8727167..bec85c1 100644 --- a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java +++ b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java @@ -2,10 +2,14 @@ package org.xbib.netty.http.client; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import javax.net.ssl.TrustManagerFactory; import java.io.InputStream; @@ -155,11 +159,6 @@ public class ClientBuilder { return this; } - public ClientBuilder setTrustManagerFactory(TrustManagerFactory trustManagerFactory) { - clientConfig.setTrustManagerFactory(trustManagerFactory); - return this; - } - public ClientBuilder setKeyCert(InputStream keyCertChainInputStream, InputStream keyInputStream) { clientConfig.setKeyCert(keyCertChainInputStream, keyInputStream); return this; @@ -171,8 +170,13 @@ public class ClientBuilder { return this; } - public ClientBuilder setServerNameIdentification(boolean serverNameIdentification) { - clientConfig.setServerNameIdentification(serverNameIdentification); + public ClientBuilder setTrustManagerFactory(TrustManagerFactory trustManagerFactory) { + clientConfig.setTrustManagerFactory(trustManagerFactory); + return this; + } + + public ClientBuilder trustInsecure() { + clientConfig.setTrustManagerFactory(InsecureTrustManagerFactory.INSTANCE); return this; } @@ -186,6 +190,46 @@ public class ClientBuilder { return this; } + public ClientBuilder addPoolNode(HttpAddress httpAddress) { + clientConfig.addPoolNode(httpAddress); + return this; + } + + public ClientBuilder setPoolNodeConnectionLimit(int nodeConnectionLimit) { + clientConfig.setPoolNodeConnectionLimit(nodeConnectionLimit); + return this; + } + + public ClientBuilder setRetriesPerPoolNode(int retriesPerNode) { + clientConfig.setRetriesPerPoolNode(retriesPerNode); + return this; + } + + public ClientBuilder setPoolVersion(HttpVersion poolVersion) { + clientConfig.setPoolVersion(poolVersion); + return this; + } + + public ClientBuilder setPoolSecure(boolean poolSecure) { + clientConfig.setPoolSecure(poolSecure); + return this; + } + + public ClientBuilder addServerNameForIdentification(String serverName) { + clientConfig.addServerNameForIdentification(serverName); + return this; + } + + public ClientBuilder setHttp2Settings(Http2Settings http2Settings) { + clientConfig.setHttp2Settings(http2Settings); + return this; + } + + public ClientBuilder setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + clientConfig.setWriteBufferWaterMark(writeBufferWaterMark); + return this; + } + public Client build() { return new Client(clientConfig, byteBufAllocator, eventLoopGroup, socketChannelClass); } diff --git a/src/main/java/org/xbib/netty/http/client/ClientConfig.java b/src/main/java/org/xbib/netty/http/client/ClientConfig.java index d45ae1f..e1677d3 100644 --- a/src/main/java/org/xbib/netty/http/client/ClientConfig.java +++ b/src/main/java/org/xbib/netty/http/client/ClientConfig.java @@ -1,15 +1,22 @@ package org.xbib.netty.http.client; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.Epoll; +import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.logging.LogLevel; import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SupportedCipherSuiteFilter; +import org.xbib.netty.http.client.retry.BackOff; import javax.net.ssl.TrustManagerFactory; import java.io.InputStream; import java.security.KeyStore; import java.security.Provider; +import java.util.ArrayList; import java.util.List; public class ClientConfig { @@ -22,7 +29,18 @@ public class ClientConfig { boolean DEBUG = false; /** - * Default for thread count. + * Default debug log level. + */ + LogLevel DEFAULT_DEBUG_LOG_LEVEL = LogLevel.DEBUG; + + /** + * The default for selecting epoll. If available, select epoll. + */ + boolean EPOLL = Epoll.isAvailable(); + + /** + * If set to 0, then Netty will decide about thread count. + * Default is Runtime.getRuntime().availableProcessors() * 2 */ int THREAD_COUNT = 0; @@ -110,12 +128,40 @@ public class ClientConfig { */ CipherSuiteFilter CIPHER_SUITE_FILTER = SupportedCipherSuiteFilter.INSTANCE; - boolean USE_SERVER_NAME_IDENTIFICATION = true; - /** * Default for SSL client authentication. */ ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE; + + /** + * Default for pool retries per node. + */ + Integer RETRIES_PER_NODE = 0; + + /** + * Default pool HTTP version. + */ + HttpVersion POOL_VERSION = HttpVersion.HTTP_1_1; + + /** + * Default connection pool security. + */ + Boolean POOL_SECURE = false; + + /** + * Default HTTP/2 settings. + */ + Http2Settings HTTP2_SETTINGS = Http2Settings.defaultSettings(); + + /** + * Default write buffer water mark. + */ + WriteBufferWaterMark WRITE_BUFFER_WATER_MARK = WriteBufferWaterMark.DEFAULT; + + /** + * Default for backoff. + */ + BackOff BACK_OFF = BackOff.ZERO_BACKOFF; } private static TrustManagerFactory TRUST_MANAGER_FACTORY; @@ -123,10 +169,6 @@ public class ClientConfig { static { try { TRUST_MANAGER_FACTORY = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - //InsecureTrustManagerFactory.INSTANCE; - //TRUST_MANAGER_FACTORY.init((KeyStore) null); - // java.lang.IllegalStateException: TrustManagerFactoryImpl is not initialized - //TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); } catch (Exception e) { TRUST_MANAGER_FACTORY = null; } @@ -134,10 +176,10 @@ public class ClientConfig { private boolean debug = Defaults.DEBUG; - /** - * If set to 0, then Netty will decide about thread count. - * Default is Runtime.getRuntime().availableProcessors() * 2 - */ + private LogLevel debugLogLevel = Defaults.DEFAULT_DEBUG_LOG_LEVEL; + + private boolean epoll = Defaults.EPOLL; + private int threadCount = Defaults.THREAD_COUNT; private boolean tcpNodelay = Defaults.TCP_NODELAY; @@ -178,8 +220,6 @@ public class ClientConfig { private KeyStore trustManagerKeyStore = null; - private boolean serverNameIdentification = Defaults.USE_SERVER_NAME_IDENTIFICATION; - private ClientAuthMode clientAuthMode = Defaults.SSL_CLIENT_AUTH_MODE; private InputStream keyCertChainInputStream; @@ -190,11 +230,23 @@ public class ClientConfig { private HttpProxyHandler httpProxyHandler; - private List nodes; + private List poolNodes = new ArrayList<>(); - private Integer nodeConnectionLimit; + private Integer poolNodeConnectionLimit; - private Integer retriesPerNode; + private Integer retriesPerPoolNode = Defaults.RETRIES_PER_NODE; + + private HttpVersion poolVersion = Defaults.POOL_VERSION; + + private Boolean poolSecure = Defaults.POOL_SECURE; + + private List serverNamesForIdentification = new ArrayList<>(); + + private Http2Settings http2Settings = Defaults.HTTP2_SETTINGS; + + private WriteBufferWaterMark writeBufferWaterMark = Defaults.WRITE_BUFFER_WATER_MARK; + + private BackOff backOff = Defaults.BACK_OFF; public ClientConfig setDebug(boolean debug) { this.debug = debug; @@ -215,6 +267,29 @@ public class ClientConfig { return debug; } + public ClientConfig setDebugLogLevel(LogLevel debugLogLevel) { + this.debugLogLevel = debugLogLevel; + return this; + } + + public LogLevel getDebugLogLevel() { + return debugLogLevel; + } + + public ClientConfig enableEpoll() { + this.epoll = true; + return this; + } + + public ClientConfig disableEpoll() { + this.epoll = false; + return this; + } + + public boolean isEpoll() { + return epoll; + } + public ClientConfig setThreadCount(int threadCount) { this.threadCount = threadCount; return this; @@ -341,6 +416,15 @@ public class ClientConfig { return enableGzip; } + public ClientConfig setHttp2Settings(Http2Settings http2Settings) { + this.http2Settings = http2Settings; + return this; + } + + public Http2Settings getHttp2Settings() { + return http2Settings; + } + public ClientConfig setSslProvider(SslProvider sslProvider) { this.sslProvider = sslProvider; return this; @@ -413,15 +497,6 @@ public class ClientConfig { return keyPassword; } - public ClientConfig setServerNameIdentification(boolean serverNameIdentification) { - this.serverNameIdentification = serverNameIdentification; - return this; - } - - public boolean isServerNameIdentification() { - return serverNameIdentification; - } - public ClientConfig setClientAuthMode(ClientAuthMode clientAuthMode) { this.clientAuthMode = clientAuthMode; return this; @@ -458,31 +533,81 @@ public class ClientConfig { return httpProxyHandler; } - public ClientConfig setNodes(List nodes) { - this.nodes = nodes; + public ClientConfig setPoolNodes(List poolNodes) { + this.poolNodes = poolNodes; return this; } - public List getNodes() { - return nodes; + public List getPoolNodes() { + return poolNodes; } - public ClientConfig setNodeConnectionLimit(Integer nodeConnectionLimit) { - this.nodeConnectionLimit = nodeConnectionLimit; + public ClientConfig addPoolNode(HttpAddress poolNodeAddress) { + this.poolNodes.add(poolNodeAddress); return this; } - public Integer getNodeConnectionLimit() { - return nodeConnectionLimit; - } - - public ClientConfig setRetriesPerNode(Integer retriesPerNode) { - this.retriesPerNode = retriesPerNode; + public ClientConfig setPoolNodeConnectionLimit(Integer poolNodeConnectionLimit) { + this.poolNodeConnectionLimit = poolNodeConnectionLimit; return this; } - public Integer getRetriesPerNode() { - return retriesPerNode; + public Integer getPoolNodeConnectionLimit() { + return poolNodeConnectionLimit; + } + + public ClientConfig setRetriesPerPoolNode(Integer retriesPerPoolNode) { + this.retriesPerPoolNode = retriesPerPoolNode; + return this; + } + + public Integer getRetriesPerPoolNode() { + return retriesPerPoolNode; + } + + public ClientConfig setPoolVersion(HttpVersion poolVersion) { + this.poolVersion = poolVersion; + return this; + } + + public HttpVersion getPoolVersion() { + return poolVersion; + } + + public ClientConfig setPoolSecure(boolean poolSecure) { + this.poolSecure = poolSecure; + return this; + } + + public boolean isPoolSecure() { + return poolSecure; + } + + public ClientConfig addServerNameForIdentification(String serverNameForIdentification) { + this.serverNamesForIdentification.add(serverNameForIdentification); + return this; + } + + public List getServerNamesForIdentification() { + return serverNamesForIdentification; + } + + public ClientConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + this.writeBufferWaterMark = writeBufferWaterMark; + return this; + } + + public WriteBufferWaterMark getWriteBufferWaterMark() { + return writeBufferWaterMark; + } + + public ClientConfig setBackOff(BackOff backOff) { + this.backOff = backOff; + return this; + } + + public BackOff getBackOff() { + return backOff; } @Override @@ -491,7 +616,5 @@ public class ClientConfig { sb.append("SSL=").append(sslProvider) .append(",SSL context provider=").append(sslContextProvider != null ? sslContextProvider.getName() : ""); return sb.toString(); - - } } diff --git a/src/main/java/org/xbib/netty/http/client/Request.java b/src/main/java/org/xbib/netty/http/client/Request.java index cfe76e2..d593115 100644 --- a/src/main/java/org/xbib/netty/http/client/Request.java +++ b/src/main/java/org/xbib/netty/http/client/Request.java @@ -10,14 +10,18 @@ import org.xbib.net.URL; import org.xbib.netty.http.client.listener.CookieListener; import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpResponseListener; +import org.xbib.netty.http.client.retry.BackOff; +import java.io.Closeable; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.concurrent.CompletableFuture; /** - * + * HTTP client request. */ -public class Request { +public class Request implements Closeable { private final URL base; @@ -33,7 +37,7 @@ public class Request { private final ByteBuf content; - private final int timeout; + private final long timeoutInMillis; private final boolean followRedirect; @@ -41,6 +45,12 @@ public class Request { private int redirectCount; + private final boolean isBackOff; + + private final BackOff backOff; + + private CompletableFuture completableFuture; + private HttpResponseListener responseListener; private HttpHeadersListener headersListener; @@ -50,7 +60,8 @@ public class Request { Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod, HttpHeaders headers, Collection cookies, String uri, ByteBuf content, - int timeout, boolean followRedirect, int maxRedirect, int redirectCount) { + long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount, + boolean isBackOff, BackOff backOff) { this.base = url; this.httpVersion = httpVersion; this.httpMethod = httpMethod; @@ -58,10 +69,12 @@ public class Request { this.cookies = cookies; this.uri = uri; this.content = content; - this.timeout = timeout; + this.timeoutInMillis = timeoutInMillis; this.followRedirect = followRedirect; this.maxRedirects = maxRedirect; this.redirectCount = redirectCount; + this.isBackOff = isBackOff; + this.backOff = backOff; } public URL base() { @@ -92,15 +105,27 @@ public class Request { return content; } - public int getTimeout() { - return timeout; + /** + * Return the timeout in milliseconds per request. This overrides the read timeout of the client. + * @return timeout timeout in milliseconds + */ + public long getTimeoutInMillis() { + return timeoutInMillis; } public boolean isFollowRedirect() { return followRedirect; } - public boolean checkRedirect() { + public boolean isBackOff() { + return isBackOff; + } + + public BackOff getBackOff() { + return backOff; + } + + public boolean canRedirect() { if (!followRedirect) { return false; } @@ -124,6 +149,15 @@ public class Request { return sb.toString(); } + public Request setCompletableFuture(CompletableFuture completableFuture) { + this.completableFuture = completableFuture; + return this; + } + + public CompletableFuture getCompletableFuture() { + return completableFuture; + } + public Request setHeadersListener(HttpHeadersListener httpHeadersListener) { this.headersListener = httpHeadersListener; return this; @@ -190,4 +224,11 @@ public class Request { public static RequestBuilder builder(HttpMethod httpMethod) { return new RequestBuilder().setMethod(httpMethod); } + + @Override + public void close() throws IOException { + if (content != null) { + content.release(); + } + } } diff --git a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java index a895b8e..b8dfb5f 100644 --- a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java +++ b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java @@ -16,6 +16,7 @@ import io.netty.util.AsciiString; import org.xbib.net.QueryParameters; import org.xbib.net.URL; import org.xbib.net.URLSyntaxException; +import org.xbib.netty.http.client.retry.BackOff; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -44,7 +45,7 @@ public class RequestBuilder { private static final boolean DEFAULT_FOLLOW_REDIRECT = true; - private static final int DEFAULT_TIMEOUT_MILLIS = 5000; + private static final long DEFAULT_TIMEOUT_MILLIS = -1L; private static final int DEFAULT_MAX_REDIRECT = 10; @@ -74,12 +75,16 @@ public class RequestBuilder { private ByteBuf content; - private int timeout; + private long timeoutInMillis; private boolean followRedirect; private int maxRedirects; + private boolean enableBackOff; + + private BackOff backOff; + RequestBuilder() { httpMethod = DEFAULT_METHOD; httpVersion = DEFAULT_HTTP_VERSION; @@ -87,7 +92,7 @@ public class RequestBuilder { gzip = DEFAULT_GZIP; keepalive = DEFAULT_KEEPALIVE; url = DEFAULT_URL; - timeout = DEFAULT_TIMEOUT_MILLIS; + timeoutInMillis = DEFAULT_TIMEOUT_MILLIS; followRedirect = DEFAULT_FOLLOW_REDIRECT; maxRedirects = DEFAULT_MAX_REDIRECT; headers = new DefaultHttpHeaders(); @@ -121,8 +126,8 @@ public class RequestBuilder { return this; } - public RequestBuilder setTimeout(int timeout) { - this.timeout = timeout; + public RequestBuilder setTimeoutInMillis(long timeoutInMillis) { + this.timeoutInMillis = timeoutInMillis; return this; } @@ -207,6 +212,16 @@ public class RequestBuilder { return this; } + public RequestBuilder enableBackOff(boolean enableBackOff) { + this.enableBackOff = enableBackOff; + return this; + } + + public RequestBuilder setBackOff(BackOff backOff) { + this.backOff = backOff; + return this; + } + public RequestBuilder setUserAgent(String userAgent) { this.userAgent = userAgent; return this; @@ -255,14 +270,10 @@ public class RequestBuilder { throw new IllegalStateException("host in URL not defined: " + url); } if (uri != null) { - if (this.url != null) { - try { - url = URL.base(url).resolve(uri); - } catch (URLSyntaxException e) { - throw new IllegalArgumentException(e); - } - } else { - url(uri); + try { + url = URL.base(url).resolve(uri); + } catch (URLSyntaxException e) { + throw new IllegalArgumentException(e); } } // add explicit parameters to URL @@ -320,7 +331,7 @@ public class RequestBuilder { validatedHeaders.remove(headerName); } return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, uri, content, - timeout, followRedirect, maxRedirects, 0); + timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff); } private void addHeader(AsciiString name, Object value) { diff --git a/src/main/java/org/xbib/netty/http/client/handler/UserEventLogger.java b/src/main/java/org/xbib/netty/http/client/handler/UserEventLogger.java deleted file mode 100644 index fcb52db..0000000 --- a/src/main/java/org/xbib/netty/http/client/handler/UserEventLogger.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.xbib.netty.http.client.handler; - -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.socket.ChannelInputShutdownReadComplete; -import io.netty.handler.ssl.SslCloseCompletionEvent; - -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * A Netty handler that logs user events and find expetced ones. - */ -@ChannelHandler.Sharable -class UserEventLogger extends ChannelInboundHandlerAdapter { - - private static final Logger logger = Logger.getLogger(UserEventLogger.class.getName()); - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - logger.log(Level.FINE, () -> "got user event " + evt); - if (evt instanceof SslCloseCompletionEvent || - evt instanceof ChannelInputShutdownReadComplete) { - logger.log(Level.FINE, () -> "user event is expected: " + evt); - return; - } - super.userEventTriggered(ctx, evt); - } -} diff --git a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java index 850d17d..399083d 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java @@ -6,87 +6,59 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslHandler; import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.HttpAddress; -import org.xbib.netty.http.client.handler.TrafficLoggingHandler; -import javax.net.ssl.SNIHostName; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLParameters; -import java.util.Collections; +import java.util.logging.Level; +import java.util.logging.Logger; public class HttpChannelInitializer extends ChannelInitializer { + private static final Logger logger = Logger.getLogger(HttpChannelInitializer.class.getName()); + private final ClientConfig clientConfig; private final HttpAddress httpAddress; + private final SslHandler sslHandler; + private final HttpResponseHandler httpResponseHandler; - public HttpChannelInitializer(ClientConfig clientConfig, HttpAddress httpAddress, HttpResponseHandler httpResponseHandler) { + public HttpChannelInitializer(ClientConfig clientConfig, + HttpAddress httpAddress, + SslHandler sslHandler, + HttpResponseHandler httpResponseHandler) { this.clientConfig = clientConfig; this.httpAddress = httpAddress; + this.sslHandler = sslHandler; this.httpResponseHandler = httpResponseHandler; } @Override - protected void initChannel(SocketChannel ch) { + public void initChannel(SocketChannel channel) { if (clientConfig.isDebug()) { - ch.pipeline().addLast(new TrafficLoggingHandler()); + channel.pipeline().addLast(new TrafficLoggingHandler(LogLevel.DEBUG)); } if (httpAddress.isSecure()) { - configureEncryptedHttp1(ch); + configureEncrypted(channel); } else { - configureCleartextHttp1(ch); + configureCleartext(channel); + } + if (clientConfig.isDebug()) { + logger.log(Level.FINE, "HTTP 1 channel initialized: " + channel.pipeline().names()); } } - private void configureEncryptedHttp1(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - try { - SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() - .sslProvider(clientConfig.getSslProvider()) - .keyManager(clientConfig.getKeyCertChainInputStream(), clientConfig.getKeyInputStream(), - clientConfig.getKeyPassword()) - .ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter()); - if (clientConfig.getSslContextProvider() != null) { - sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider()); - } - if (clientConfig.getTrustManagerFactory() != null) { - sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory()); - } - SslContext sslContext = sslContextBuilder.build(); - SslHandler sslHandler = sslContext.newHandler(ch.alloc()); - SSLEngine engine = sslHandler.engine(); - if (clientConfig.isServerNameIdentification()) { - String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName(); - SSLParameters params = engine.getSSLParameters(); - params.setServerNames(Collections.singletonList(new SNIHostName(fullQualifiedHostname))); - engine.setSSLParameters(params); - } - pipeline.addLast(sslHandler); - switch (clientConfig.getClientAuthMode()) { - case NEED: - engine.setNeedClientAuth(true); - break; - case WANT: - engine.setWantClientAuth(true); - break; - default: - break; - } - } catch (SSLException e) { - throw new IllegalStateException("unable to configure SSL: " + e.getMessage(), e); - } - configureCleartextHttp1(ch); + private void configureEncrypted(SocketChannel channel) { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast(sslHandler); + configureCleartext(channel); } - private void configureCleartextHttp1(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); + private void configureCleartext(SocketChannel channel) { + ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new HttpClientCodec(clientConfig.getMaxInitialLineLength(), clientConfig.getMaxHeadersSize(), clientConfig.getMaxChunkSize())); if (clientConfig.isEnableGzip()) { diff --git a/src/main/java/org/xbib/netty/http/client/handler/TrafficLoggingHandler.java b/src/main/java/org/xbib/netty/http/client/handler/http1/TrafficLoggingHandler.java similarity index 88% rename from src/main/java/org/xbib/netty/http/client/handler/TrafficLoggingHandler.java rename to src/main/java/org/xbib/netty/http/client/handler/http1/TrafficLoggingHandler.java index bd62909..333a2f0 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/TrafficLoggingHandler.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http1/TrafficLoggingHandler.java @@ -1,4 +1,4 @@ -package org.xbib.netty.http.client.handler; +package org.xbib.netty.http.client.handler.http1; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; @@ -13,8 +13,8 @@ import io.netty.handler.logging.LoggingHandler; @ChannelHandler.Sharable public class TrafficLoggingHandler extends LoggingHandler { - public TrafficLoggingHandler() { - super("client", LogLevel.TRACE); + public TrafficLoggingHandler(LogLevel level) { + super("client", level); } @Override diff --git a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java index c4493aa..c3c340b 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java @@ -7,25 +7,14 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2FrameLogger; -import io.netty.handler.codec.http2.Http2SecurityUtil; import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder; import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.ssl.ApplicationProtocolConfig; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.ssl.SupportedCipherSuiteFilter; import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.HttpAddress; -import javax.net.ssl.SNIHostName; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLParameters; -import java.util.Collections; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,30 +26,69 @@ public class Http2ChannelInitializer extends ChannelInitializer { private final HttpAddress httpAddress; + private final SslHandler sslHandler; + private final Http2SettingsHandler http2SettingsHandler; private final Http2ResponseHandler http2ResponseHandler; public Http2ChannelInitializer(ClientConfig clientConfig, HttpAddress httpAddress, + SslHandler sslHandler, Http2SettingsHandler http2SettingsHandler, Http2ResponseHandler http2ResponseHandler) { this.clientConfig = clientConfig; this.httpAddress = httpAddress; + this.sslHandler = sslHandler; this.http2SettingsHandler = http2SettingsHandler; this.http2ResponseHandler = http2ResponseHandler; } /** - * The channel initialization for HTTP/2 is always encrypted. - * The reason is there is no known HTTP/2 server supporting cleartext. + * The channel initialization for HTTP/2. * - * @param ch socket channel + * @param channel socket channel */ @Override - protected void initChannel(SocketChannel ch) { + public void initChannel(SocketChannel channel) { + if (httpAddress.isSecure()) { + configureEncrypted(channel); + } else { + configureCleartext(channel); + } + if (clientConfig.isDebug()) { + logger.log(Level.FINE, "HTTP/2 channel initialized: " + channel.pipeline().names()); + } + } + + private void configureEncrypted(SocketChannel channel) { + channel.pipeline().addLast(sslHandler); + ApplicationProtocolNegotiationHandler negotiationHandler = new ApplicationProtocolNegotiationHandler("") { + @Override + protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { + if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { + ctx.pipeline().addLast(newConnectionHandler(), http2SettingsHandler, http2ResponseHandler); + if (clientConfig.isDebug()) { + logger.log(Level.FINE, "after negotiation: " + ctx.pipeline().names()); + } + return; + } + // we do not fall back to HTTP1 + ctx.close(); + throw new IllegalStateException("protocol not accepted: " + protocol); + } + }; + channel.pipeline().addLast(negotiationHandler); +} + + private void configureCleartext(SocketChannel ch) { + ch.pipeline().addLast(newConnectionHandler(), http2SettingsHandler, http2ResponseHandler); + } + + private Http2ConnectionHandler newConnectionHandler() { Http2Connection http2Connection = new DefaultHttp2Connection(false); HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder() + .initialSettings(clientConfig.getHttp2Settings()) .connection(http2Connection) .frameListener(new Http2PushPromiseHandler(http2Connection, new InboundHttp2ToHttpAdapterBuilder(http2Connection) @@ -68,48 +96,8 @@ public class Http2ChannelInitializer extends ChannelInitializer { .propagateSettings(true) .build())); if (clientConfig.isDebug()) { - http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client")); - } - Http2ConnectionHandler http2ConnectionHandler = http2ConnectionHandlerBuilder.build(); - try { - SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() - .sslProvider(clientConfig.getSslProvider()) - .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) - .applicationProtocolConfig(new ApplicationProtocolConfig( - ApplicationProtocolConfig.Protocol.ALPN, - ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, - ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, - ApplicationProtocolNames.HTTP_2)); - if (clientConfig.getSslContextProvider() != null) { - sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider()); - } - if (clientConfig.getTrustManagerFactory() != null) { - sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory()); - } - SslContext sslContext = sslContextBuilder.build(); - SslHandler sslHandler = sslContext.newHandler(ch.alloc()); - SSLEngine engine = sslHandler.engine(); - if (clientConfig.isServerNameIdentification()) { - String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName(); - SSLParameters params = engine.getSSLParameters(); - params.setServerNames(Collections.singletonList(new SNIHostName(fullQualifiedHostname))); - engine.setSSLParameters(params); - } - ch.pipeline().addLast(sslHandler); - ApplicationProtocolNegotiationHandler negotiationHandler = new ApplicationProtocolNegotiationHandler("") { - @Override - protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { - if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { - ctx.pipeline().addLast(http2ConnectionHandler, http2SettingsHandler, http2ResponseHandler); - return; - } - ctx.close(); - throw new IllegalStateException("unknown protocol: " + protocol); - } - }; - ch.pipeline().addLast(negotiationHandler); - } catch (SSLException e) { - logger.log(Level.SEVERE, e.getMessage(), e); + http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(clientConfig.getDebugLogLevel(), "client")); } + return http2ConnectionHandlerBuilder.build(); } } diff --git a/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java b/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java similarity index 60% rename from src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java rename to src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java index 26cb3ed..87134bb 100644 --- a/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java +++ b/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java @@ -6,14 +6,18 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpVersion; import io.netty.util.AttributeKey; import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -22,12 +26,16 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; -public class SimpleChannelPool implements Pool { +public class BoundedChannelPool implements Pool { - private static final Logger logger = Logger.getLogger(SimpleChannelPool.class.getName()); + private static final Logger logger = Logger.getLogger(BoundedChannelPool.class.getName()); private final Semaphore semaphore; + private final HttpVersion httpVersion; + + private final boolean isSecure; + private final ChannelPoolHandler channelPoolhandler; private final List nodes; @@ -52,6 +60,8 @@ public class SimpleChannelPool implements Pool { /** * @param semaphore the concurrency level + * @param httpVersion the HTTP version of the pool connections + * @param isSecure if this pool has secure connections * @param nodes the endpoint nodes, any element may contain the port (followed after ":") * to override the defaultPort argument * @param bootstrap bootstrap instance @@ -59,16 +69,19 @@ public class SimpleChannelPool implements Pool { * @param retriesPerNode the max count of the subsequent connection failures to the node before * the node will be excluded from the pool. If set to 0, the value is ignored. */ - public SimpleChannelPool(Semaphore semaphore, List nodes, Bootstrap bootstrap, - ChannelPoolHandler channelPoolHandler, int retriesPerNode) { + public BoundedChannelPool(Semaphore semaphore, HttpVersion httpVersion, boolean isSecure, + List nodes, Bootstrap bootstrap, + ChannelPoolHandler channelPoolHandler, int retriesPerNode) { this.semaphore = semaphore; + this.httpVersion = httpVersion; + this.isSecure = isSecure; this.channelPoolhandler = channelPoolHandler; this.nodes = nodes; this.retriesPerNode = retriesPerNode; this.lock = new ReentrantLock(); this.attributeKey = AttributeKey.valueOf("poolKey"); if (nodes == null || nodes.isEmpty()) { - throw new IllegalArgumentException("empty nodes array argument"); + throw new IllegalArgumentException("nodes must not be empty"); } this.numberOfNodes = nodes.size(); bootstraps = new HashMap<>(numberOfNodes); @@ -77,46 +90,48 @@ public class SimpleChannelPool implements Pool { counts = new HashMap<>(numberOfNodes); failedCounts = new HashMap<>(numberOfNodes); for (K node : nodes) { + ChannelPoolInitializer initializer = new ChannelPoolInitializer(node, channelPoolHandler); bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress()) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel channel) throws Exception { - if(!channel.eventLoop().inEventLoop()) { - throw new IllegalStateException(); - } - if (channelPoolHandler != null) { - channelPoolHandler.channelCreated(channel); - } - } - })); + .handler(initializer)); availableChannels.put(node, new ConcurrentLinkedQueue<>()); counts.put(node, 0); failedCounts.put(node, 0); } } + public HttpVersion getVersion() { + return httpVersion; + } + + public boolean isSecure() { + return isSecure; + } + + public AttributeKey getAttributeKey() { + return attributeKey; + } + @Override - public void prepare(int count) throws ConnectException { - if (count > 0) { - for (int i = 0; i < count; i ++) { - Channel channel = connectToAnyNode(); - if (channel == null) { - throw new ConnectException("failed to prepare the connections"); - } - K nodeAddr = channel.attr(attributeKey).get(); - if (channel.isActive()) { - Queue channelQueue = availableChannels.get(nodeAddr); - if (channelQueue != null) { - channelQueue.add(channel); - } - } else { - channel.close(); - } - } - logger.log(Level.FINE,"prepared " + count + " connections"); - } else { - throw new IllegalArgumentException("Connection count should be > 0, but got " + count); + public void prepare(int channelCount) throws ConnectException { + if (channelCount <= 0) { + throw new IllegalArgumentException("channel count must be greater zero, but got " + channelCount); } + for (int i = 0; i < channelCount; i++) { + Channel channel = newConnection(); + if (channel == null) { + throw new ConnectException("failed to prepare"); + } + K key = channel.attr(attributeKey).get(); + if (channel.isActive()) { + Queue channelQueue = availableChannels.get(key); + if (channelQueue != null) { + channelQueue.add(channel); + } + } else { + channel.close(); + } + } + logger.log(Level.FINE,"prepared " + channelCount + " channels"); } @Override @@ -124,7 +139,7 @@ public class SimpleChannelPool implements Pool { Channel channel = null; if (semaphore.tryAcquire()) { if ((channel = poll()) == null) { - channel = connectToAnyNode(); + channel = newConnection(); } if (channel == null) { semaphore.release(); @@ -150,7 +165,7 @@ public class SimpleChannelPool implements Pool { Channel channel; for (int i = 0; i < availableCount; i ++) { if (null == (channel = poll())) { - channel = connectToAnyNode(); + channel = newConnection(); } if (channel == null) { semaphore.release(availableCount - i); @@ -167,18 +182,23 @@ public class SimpleChannelPool implements Pool { @Override public void release(Channel channel) throws Exception { - K nodeAddr = channel.attr(attributeKey).get(); - if (channel.isActive()) { - Queue channelQueue = availableChannels.get(nodeAddr); - if (channelQueue != null) { - channelQueue.add(channel); + try { + if (channel != null) { + if (channel.isActive()) { + K key = channel.attr(attributeKey).get(); + Queue channelQueue = availableChannels.get(key); + if (channelQueue != null) { + channelQueue.add(channel); + } + } else if (channel.isOpen()) { + channel.close(); + } + if (channelPoolhandler != null) { + channelPoolhandler.channelReleased(channel); + } } + } finally { semaphore.release(); - } else { - channel.close(); - } - if (channelPoolhandler != null) { - channelPoolhandler.channelReleased(channel); } } @@ -193,65 +213,63 @@ public class SimpleChannelPool implements Pool { public void close() { lock.lock(); try { - int closedConnCount = 0; - for (K nodeAddr : availableChannels.keySet()) { - for (Channel conn : availableChannels.get(nodeAddr)) { - if (conn.isOpen()) { - conn.close(); - closedConnCount++; - } + int count = 0; + Set channelSet = new HashSet<>(); + for (Map.Entry> entry : availableChannels.entrySet()) { + channelSet.addAll(entry.getValue()); + } + for (Map.Entry> entry : channels.entrySet()) { + channelSet.addAll(entry.getValue()); + } + for (Channel channel : channelSet) { + if (channel != null && channel.isOpen()) { + logger.log(Level.FINE, "closing channel " + channel); + channel.close(); + count++; } } availableChannels.clear(); - for (K nodeAddr : channels.keySet()) { - for (Channel channel : channels.get(nodeAddr)) { - if (channel != null && channel.isOpen()) { - channel.close(); - closedConnCount++; - } - } - } channels.clear(); bootstraps.clear(); counts.clear(); - logger.log(Level.FINE, "closed " + closedConnCount + " connections"); + logger.log(Level.FINE, "closed " + count + " connections"); } finally { lock.unlock(); } } - private Channel connectToAnyNode() throws ConnectException { + private Channel newConnection() throws ConnectException { Channel channel = null; - K nodeAddr = null; - K nextNodeAddr; + K key = null; + K nextKey; int min = Integer.MAX_VALUE; int next; int i = ThreadLocalRandom.current().nextInt(numberOfNodes); for (int j = i; j < numberOfNodes; j ++) { - nextNodeAddr = nodes.get(j % numberOfNodes); - next = counts.get(nextNodeAddr); - if(next == 0) { - nodeAddr = nextNodeAddr; + nextKey = nodes.get(j % numberOfNodes); + next = counts.get(nextKey); + if (next == 0) { + key = nextKey; break; } else if (next < min) { min = next; - nodeAddr = nextNodeAddr; + key = nextKey; } } - if (nodeAddr != null) { - logger.log(Level.FINE, "trying connection to " + nodeAddr); + if (key != null) { + logger.log(Level.FINE, "trying connection to " + key); try { - channel = connect(nodeAddr); + channel = connect(key); } catch (Exception e) { - logger.log(Level.WARNING, "failed to create a new connection to " + nodeAddr + ": " + e.toString()); + logger.log(Level.WARNING, "failed to create a new connection to " + key + ": " + e.toString()); if (retriesPerNode > 0) { - int selectedNodeFailedConnAttemptsCount = failedCounts.get(nodeAddr) + 1; - failedCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount); + int selectedNodeFailedConnAttemptsCount = failedCounts.get(key) + 1; + failedCounts.put(key, selectedNodeFailedConnAttemptsCount); if (selectedNodeFailedConnAttemptsCount > retriesPerNode) { - logger.log(Level.WARNING, "failed to connect to the node " + nodeAddr + " " + logger.log(Level.WARNING, "failed to connect to the node " + key + " " + selectedNodeFailedConnAttemptsCount + " times, " + "excluding the node from the connection pool"); - counts.put(nodeAddr, Integer.MAX_VALUE); + counts.put(key, Integer.MAX_VALUE); boolean allNodesExcluded = true; for (K node : nodes) { if (counts.get(node) < Integer.MAX_VALUE) { @@ -272,22 +290,22 @@ public class SimpleChannelPool implements Pool { } } if (channel != null) { - channel.closeFuture().addListener(new CloseChannelListener(nodeAddr, channel)); - channel.attr(attributeKey).set(nodeAddr); - channels.computeIfAbsent(nodeAddr, node -> new ArrayList<>()).add(channel); + channel.closeFuture().addListener(new CloseChannelListener(key, channel)); + channel.attr(attributeKey).set(key); + channels.computeIfAbsent(key, node -> new ArrayList<>()).add(channel); synchronized (counts) { - counts.put(nodeAddr, counts.get(nodeAddr) + 1); + counts.put(key, counts.get(key) + 1); } - if(retriesPerNode > 0) { - failedCounts.put(nodeAddr, 0); + if (retriesPerNode > 0) { + failedCounts.put(key, 0); } - logger.log(Level.FINE,"new connection to " + nodeAddr + " created"); + logger.log(Level.FINE,"new connection to " + key + " created"); } return channel; } - private Channel connect(K addr) throws Exception { - Bootstrap bootstrap = bootstraps.get(addr); + private Channel connect(K key) throws Exception { + Bootstrap bootstrap = bootstraps.get(key); if (bootstrap != null) { return bootstrap.connect().sync().channel(); } @@ -312,26 +330,26 @@ public class SimpleChannelPool implements Pool { private class CloseChannelListener implements ChannelFutureListener { - private final K nodeAddr; + private final K key; private final Channel channel; - private CloseChannelListener(K nodeAddr, Channel channel) { - this.nodeAddr = nodeAddr; + private CloseChannelListener(K key, Channel channel) { + this.key = key; this.channel = channel; } @Override public void operationComplete(ChannelFuture future) { - logger.log(Level.FINE,"connection to " + nodeAddr + " closed"); + logger.log(Level.FINE,"connection to " + key + " closed"); lock.lock(); try { synchronized (counts) { - if (counts.containsKey(nodeAddr)) { - counts.put(nodeAddr, counts.get(nodeAddr) - 1); + if (counts.containsKey(key)) { + counts.put(key, counts.get(key) - 1); } } synchronized (channels) { - List channels = SimpleChannelPool.this.channels.get(nodeAddr); + List channels = BoundedChannelPool.this.channels.get(key); if (channels != null) { channels.remove(channel); } @@ -342,4 +360,27 @@ public class SimpleChannelPool implements Pool { } } } + + class ChannelPoolInitializer extends ChannelInitializer { + + private final K key; + + private final ChannelPoolHandler channelPoolHandler; + + ChannelPoolInitializer(K key, ChannelPoolHandler channelPoolHandler) { + this.key = key; + this.channelPoolHandler = channelPoolHandler; + } + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + if (!channel.eventLoop().inEventLoop()) { + throw new IllegalStateException(); + } + channel.attr(attributeKey).set(key); + if (channelPoolHandler != null) { + channelPoolHandler.channelCreated(channel); + } + } + } } diff --git a/src/main/java/org/xbib/netty/http/client/rest/RestClient.java b/src/main/java/org/xbib/netty/http/client/rest/RestClient.java index a25870f..6ba3742 100644 --- a/src/main/java/org/xbib/netty/http/client/rest/RestClient.java +++ b/src/main/java/org/xbib/netty/http/client/rest/RestClient.java @@ -36,7 +36,13 @@ public class RestClient { public String asString() { ByteBuf byteBuf = response != null ? response.content() : null; - return byteBuf != null && byteBuf.isReadable() ? response.content().toString(StandardCharsets.UTF_8) : null; + try { + return byteBuf != null && byteBuf.isReadable() ? response.content().toString(StandardCharsets.UTF_8) : null; + } finally { + if (byteBuf != null) { + byteBuf.release(); + } + } } public static RestClient get(String urlString) throws IOException { diff --git a/src/main/java/org/xbib/netty/http/client/retry/BackOff.java b/src/main/java/org/xbib/netty/http/client/retry/BackOff.java index 4025363..b346438 100644 --- a/src/main/java/org/xbib/netty/http/client/retry/BackOff.java +++ b/src/main/java/org/xbib/netty/http/client/retry/BackOff.java @@ -14,7 +14,7 @@ public interface BackOff { /** * Reset to initial state. */ - void reset() throws IOException; + void reset(); /** * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to @@ -33,7 +33,7 @@ public interface BackOff { } * */ - long nextBackOffMillis() throws IOException; + long nextBackOffMillis(); /** * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried diff --git a/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java b/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java index dd0638a..8f20b8c 100644 --- a/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java +++ b/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java @@ -137,22 +137,19 @@ public class ExponentialBackOff implements BackOff { /** * @param builder builder */ - protected ExponentialBackOff(Builder builder) { + private ExponentialBackOff(Builder builder) { initialIntervalMillis = builder.initialIntervalMillis; randomizationFactor = builder.randomizationFactor; multiplier = builder.multiplier; maxIntervalMillis = builder.maxIntervalMillis; maxElapsedTimeMillis = builder.maxElapsedTimeMillis; nanoClock = builder.nanoClock; - //Preconditions.checkArgument(initialIntervalMillis > 0); - //Preconditions.checkArgument(0 <= randomizationFactor && randomizationFactor < 1); - //Preconditions.checkArgument(multiplier >= 1); - //Preconditions.checkArgument(maxIntervalMillis >= initialIntervalMillis); - //Preconditions.checkArgument(maxElapsedTimeMillis > 0); reset(); } - /** Sets the interval back to the initial retry interval and restarts the timer. */ + /** + * Sets the interval back to the initial retry interval and restarts the timer. + */ public final void reset() { currentIntervalMillis = initialIntervalMillis; startTimeNanos = nanoClock.nanoTime(); @@ -275,6 +272,29 @@ public class ExponentialBackOff implements BackOff { } } + /** + * Nano clock which can be used to measure elapsed time in nanoseconds. + * + *

+ * The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations + * may be used for testing. + *

+ * + */ + public interface NanoClock { + + /** + * Returns the current value of the most precise available system timer, in nanoseconds for use to + * measure elapsed time, to match the behavior of {@link System#nanoTime()}. + */ + long nanoTime(); + + /** + * Provides the default System implementation of a nano clock by using {@link System#nanoTime()}. + */ + NanoClock SYSTEM = System::nanoTime; + } + /** * Builder for {@link ExponentialBackOff}. * @@ -285,7 +305,7 @@ public class ExponentialBackOff implements BackOff { public static class Builder { /** The initial retry interval in milliseconds. */ - int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS; + private int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS; /** * The randomization factor to use for creating a range around the retry interval. @@ -295,32 +315,53 @@ public class ExponentialBackOff implements BackOff { * above the retry interval. *

*/ - double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR; + private double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR; - /** The value to multiply the current interval with for each retry attempt. */ - double multiplier = DEFAULT_MULTIPLIER; + /** + * The value to multiply the current interval with for each retry attempt. + */ + private double multiplier = DEFAULT_MULTIPLIER; /** * The maximum value of the back off period in milliseconds. Once the retry interval reaches * this value it stops increasing. */ - int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS; + private int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS; /** * The maximum elapsed time in milliseconds after instantiating {@link ExponentialBackOff} or * calling {@link #reset()} after which {@link #nextBackOffMillis()} returns * {@link BackOff#STOP}. */ - int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS; + private int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS; - /** Nano clock. */ - NanoClock nanoClock = NanoClock.SYSTEM; + /** + * Nano clock. + */ + private NanoClock nanoClock = NanoClock.SYSTEM; public Builder() { } - /** Builds a new instance of {@link ExponentialBackOff}. */ + /** + * Builds a new instance of {@link ExponentialBackOff}. + * */ public ExponentialBackOff build() { + if (initialIntervalMillis <= 0) { + throw new IllegalArgumentException(); + } + if (!(0 <= randomizationFactor && randomizationFactor < 1)) { + throw new IllegalArgumentException(); + } + if (multiplier < 1) { + throw new IllegalArgumentException(); + } + if ((maxIntervalMillis < initialIntervalMillis)) { + throw new IllegalArgumentException(); + } + if (maxElapsedTimeMillis <= 0) { + throw new IllegalArgumentException(); + } return new ExponentialBackOff(this); } @@ -480,7 +521,9 @@ public class ExponentialBackOff implements BackOff { *

*/ public Builder setNanoClock(NanoClock nanoClock) { - this.nanoClock = nanoClock; //Preconditions.checkNotNull(nanoClock); + if (nanoClock != null) { + this.nanoClock = nanoClock; + } return this; } } diff --git a/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java b/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java deleted file mode 100644 index 11e8e13..0000000 --- a/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2013 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.xbib.netty.http.client.retry; - -/** - * Nano clock which can be used to measure elapsed time in nanoseconds. - * - *

- * The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations - * may be used for testing. - *

- * - * @since 1.14 - * @author Yaniv Inbar - */ -public interface NanoClock { - - /** - * Returns the current value of the most precise available system timer, in nanoseconds for use to - * measure elapsed time, to match the behavior of {@link System#nanoTime()}. - */ - long nanoTime(); - - /** - * Provides the default System implementation of a nano clock by using {@link System#nanoTime()}. - */ - NanoClock SYSTEM = new NanoClock() { - public long nanoTime() { - return System.nanoTime(); - } - }; -} diff --git a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java index 617afdc..98c8ff6 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java @@ -5,7 +5,9 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.ClientCookieEncoder; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http2.HttpConversionUtil; @@ -16,6 +18,9 @@ import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.RequestBuilder; +import org.xbib.netty.http.client.listener.CookieListener; +import org.xbib.netty.http.client.listener.HttpHeadersListener; +import org.xbib.netty.http.client.retry.BackOff; import java.io.IOException; import java.net.ConnectException; @@ -48,6 +53,8 @@ abstract class BaseTransport implements Transport { protected SortedMap requests; + protected Throwable throwable; + private Map cookieBox; BaseTransport(Client client, HttpAddress httpAddress) { @@ -56,43 +63,55 @@ abstract class BaseTransport implements Transport { this.requests = new ConcurrentSkipListMap<>(); } - @Override - public HttpAddress httpAddress() { - return httpAddress; - } - @Override public Transport execute(Request request) throws IOException { ensureConnect(); - // some HTTP 1.1 servers like Elasticsearch do not understand full URIs in HTTP command line - String uri = request.httpVersion().majorVersion() < 2 ? + if (throwable != null) { + return this; + } + // Some HTTP 1 servers do not understand URIs in HTTP command line in spite of RFC 7230. + // The "origin form" requires a "Host" header. + // Our algorithm is: use always "origin form" for HTTP 1, use absolute form for HTTP 2. + // The reason is that Netty derives the HTTP/2 scheme header from the absolute form. + String uri = request.httpVersion().majorVersion() == 1 ? request.base().relativeReference() : request.base().toString(); FullHttpRequest fullHttpRequest = request.content() == null ? new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) : new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri, request.content()); - logger.log(Level.INFO, fullHttpRequest.toString()); - Integer streamId = nextStream(); - if (streamId != null) { - request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId)); + try { + Integer streamId = nextStream(); + if (streamId != null && streamId > 0) { + request.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId)); + } else { + if (request.httpVersion().majorVersion() == 2) { + logger.log(Level.WARNING, "no streamId but HTTP/2 request. Strange!!! " + getClass().getName()); + } + } + // add matching cookies from box (previous requests) and new cookies from request builder + Collection cookies = new ArrayList<>(); + cookies.addAll(matchCookiesFromBox(request)); + cookies.addAll(matchCookies(request)); + if (!cookies.isEmpty()) { + request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies)); + } + // add stream-id and cookie headers + fullHttpRequest.headers().set(request.headers()); + if (streamId != null) { + requests.put(streamId, request); + } + if (channel.isWritable()) { + channel.writeAndFlush(fullHttpRequest); + + } + } finally { + request.close(); } - // add matching cookies from box (previous requests) and new cookies from request builder - Collection cookies = new ArrayList<>(); - cookies.addAll(matchCookiesFromBox(request)); - cookies.addAll(matchCookies(request)); - if (!cookies.isEmpty()) { - request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies)); - } - // add stream-id and cookie headers - fullHttpRequest.headers().set(request.headers()); - requests.put(streamId, request); - logger.log(Level.FINE, () -> "streamId = " + streamId + " writing request = " + fullHttpRequest); - channel.writeAndFlush(fullHttpRequest); return this; } /** - * Experimental. + * Experimental method for executing in a wrapping completable future. * @param request request * @param supplier supplier * @param supplier result @@ -102,44 +121,86 @@ abstract class BaseTransport implements Transport { public CompletableFuture execute(Request request, Function supplier) throws IOException { final CompletableFuture completableFuture = new CompletableFuture<>(); - //request.setExceptionListener(completableFuture::completeExceptionally); request.setResponseListener(response -> completableFuture.complete(supplier.apply(response))); execute(request); return completableFuture; } @Override - public synchronized void close() { + public synchronized void close() throws IOException { get(); - if (channel != null) { - channel.close(); - channel = null; - } + client.releaseChannel(channel); } - protected void ensureConnect() throws IOException { - if (channel == null) { - try { - channel = client.newChannel(httpAddress); - channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this); - awaitSettings(); - } catch (InterruptedException e) { - throw new ConnectException("unable to connect to " + httpAddress); + @Override + public boolean isFailed() { + return throwable != null; + } + + @Override + public Throwable getFailure() { + return throwable; + } + + @Override + public void headersReceived(Integer streamId, HttpHeaders httpHeaders) { + Request request = fromStreamId(streamId); + if (request != null) { + HttpHeadersListener httpHeadersListener = request.getHeadersListener(); + if (httpHeadersListener != null) { + httpHeadersListener.onHeaders(httpHeaders); + } + for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { + Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); + addCookie(cookie); + CookieListener cookieListener = request.getCookieListener(); + if (cookieListener != null) { + cookieListener.onCookie(cookie); + } } } } - protected Request continuation(Integer streamId, FullHttpResponse httpResponse) throws URLSyntaxException { + private void ensureConnect() throws IOException { + if (channel == null) { + channel = client.newChannel(httpAddress); + if (channel != null) { + channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this); + awaitSettings(); + } else { + ConnectException connectException; + if (httpAddress != null) { + connectException = new ConnectException("unable to connect to " + httpAddress); + } else if (client.hasPooledConnections()){ + connectException = new ConnectException("unable to get channel from pool"); + } else { + // if API misuse + connectException = new ConnectException("unable to get channel"); + } + this.throwable = connectException; + this.channel = null; + throw connectException; + } + } + } + + protected Request fromStreamId(Integer streamId) { + if (streamId == null) { + streamId = requests.lastKey(); + } + return requests.get(streamId); + } + + protected Request continuation(Request request, FullHttpResponse httpResponse) throws URLSyntaxException { if (httpResponse == null) { return null; } - Request request = fromStreamId(streamId); if (request == null) { - // push promise + // push promise or something else return null; } try { - if (request.checkRedirect()) { + if (request.canRedirect()) { int status = httpResponse.status().code(); switch (status) { case 300: @@ -152,7 +213,7 @@ abstract class BaseTransport implements Transport { String location = httpResponse.headers().get(HttpHeaderNames.LOCATION); location = new PercentDecoder(StandardCharsets.UTF_8.newDecoder()).decode(location); if (location != null) { - logger.log(Level.INFO, "found redirect location: " + location); + logger.log(Level.FINE, "found redirect location: " + location); URL redirUrl = URL.base(request.base()).resolve(location); HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod(); RequestBuilder newHttpRequestBuilder = Request.builder(method) @@ -160,45 +221,75 @@ abstract class BaseTransport implements Transport { .setVersion(request.httpVersion()) .setHeaders(request.headers()) .content(request.content()); - // TODO(jprante) convencience to copy pathAndQuery from one request to another request.base().getQueryParams().forEach(pair -> newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond()) ); request.cookies().forEach(newHttpRequestBuilder::addCookie); Request newHttpRequest = newHttpRequestBuilder.build(); newHttpRequest.setResponseListener(request.getResponseListener()); - //newHttpRequest.setExceptionListener(request.getExceptionListener()); newHttpRequest.setHeadersListener(request.getHeadersListener()); newHttpRequest.setCookieListener(request.getCookieListener()); - //newHttpRequest.setPushListener(request.getPushListener()); StringBuilder hostAndPort = new StringBuilder(); hostAndPort.append(redirUrl.getHost()); if (redirUrl.getPort() != null) { hostAndPort.append(':').append(redirUrl.getPort()); } newHttpRequest.headers().set(HttpHeaderNames.HOST, hostAndPort.toString()); - logger.log(Level.INFO, "redirect url: " + redirUrl + + logger.log(Level.FINE, "redirect url: " + redirUrl + " old request: " + request.toString() + " new request: " + newHttpRequest.toString()); return newHttpRequest; } break; default: - logger.log(Level.FINE, "no redirect because of status code " + status); break; } } } catch (MalformedInputException | UnmappableCharacterException e) { - logger.log(Level.WARNING, e.getMessage(), e); + this.throwable = e; } return null; } - protected Request fromStreamId(Integer streamId) { - if (streamId == null) { - streamId = requests.lastKey(); + protected Request retry(Request request, FullHttpResponse httpResponse) { + if (httpResponse == null) { + return null; } - return requests.get(streamId); + if (request == null) { + // push promise or something else + return null; + } + if (request.isBackOff()) { + BackOff backOff = request.getBackOff() != null ? request.getBackOff() : + client.getClientConfig().getBackOff(); + int status = httpResponse.status().code(); + switch (status) { + case 403: + case 404: + case 500: + case 502: + case 503: + case 504: + case 507: + case 509: + if (backOff != null) { + long millis = backOff.nextBackOffMillis(); + if (millis != BackOff.STOP) { + logger.log(Level.FINE, "status = " + status + " backing off request by " + millis + " milliseconds"); + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + // ignore + } + return request; + } + } + break; + default: + break; + } + } + return null; } public void setCookieBox(Map cookieBox) { @@ -209,7 +300,7 @@ abstract class BaseTransport implements Transport { return cookieBox; } - public void addCookie(Cookie cookie) { + private void addCookie(Cookie cookie) { if (cookieBox == null) { this.cookieBox = Collections.synchronizedMap(new LRUCache(32)); } @@ -241,7 +332,8 @@ abstract class BaseTransport implements Transport { return (secureScheme && cookie.isSecure()) || (!secureScheme && !cookie.isSecure()); } - class LRUCache extends LinkedHashMap { + @SuppressWarnings("serial") + static class LRUCache extends LinkedHashMap { private final int cacheSize; diff --git a/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java b/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java index 4edb3a4..69ccd5a 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java @@ -2,18 +2,12 @@ package org.xbib.netty.http.client.transport; import io.netty.channel.Channel; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.cookie.ClientCookieDecoder; -import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import org.xbib.net.URLSyntaxException; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; -import org.xbib.netty.http.client.listener.CookieListener; -import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpResponseListener; import java.io.IOException; @@ -27,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; -public class Http2Transport extends BaseTransport implements Transport { +public class Http2Transport extends BaseTransport { private static final Logger logger = Logger.getLogger(Http2Transport.class.getName()); @@ -41,7 +35,9 @@ public class Http2Transport extends BaseTransport implements Transport { super(client, httpAddress); streamIdCounter = new AtomicInteger(3); streamidPromiseMap = new ConcurrentSkipListMap<>(); - settingsPromise = new CompletableFuture<>(); + settingsPromise = (httpAddress != null && httpAddress.isSecure()) || + (client.hasPooledConnections() && client.getPool().isSecure()) ? + new CompletableFuture<>() : null; } @Override @@ -69,62 +65,47 @@ public class Http2Transport extends BaseTransport implements Transport { public void awaitSettings() { if (settingsPromise != null) { try { - settingsPromise.get(client.getTimeout(), TimeUnit.MILLISECONDS); + settingsPromise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { settingsPromise.completeExceptionally(e); } - } else { - logger.log(Level.WARNING, "waiting for settings but no promise present"); } } @Override public void responseReceived(Integer streamId, FullHttpResponse fullHttpResponse) { if (streamId == null) { - logger.log(Level.WARNING, "unexpected message received: " + fullHttpResponse); + logger.log(Level.WARNING, "no stream ID, unexpected message received: " + fullHttpResponse); return; } CompletableFuture promise = streamidPromiseMap.get(streamId); if (promise == null) { - logger.log(Level.WARNING, "response received for unknown stream id " + streamId); - } else { - Request request = fromStreamId(streamId); - if (request != null) { - HttpResponseListener responseListener = request.getResponseListener(); - if (responseListener != null) { - responseListener.onResponse(fullHttpResponse); - } - try { - request = continuation(streamId, fullHttpResponse); - if (request != null) { - // synchronous call here - client.continuation(this, request); - } - } catch (URLSyntaxException | IOException e) { - logger.log(Level.WARNING, e.getMessage(), e); - } - } - // complete origin - promise.complete(true); + logger.log(Level.WARNING, "response received for stream ID " + streamId + " but found no promise"); + return; } - } - - @Override - public void headersReceived(Integer streamId, HttpHeaders httpHeaders) { Request request = fromStreamId(streamId); if (request != null) { - HttpHeadersListener httpHeadersListener = request.getHeadersListener(); - if (httpHeadersListener != null) { - httpHeadersListener.onHeaders(httpHeaders); + HttpResponseListener responseListener = request.getResponseListener(); + if (responseListener != null) { + responseListener.onResponse(fullHttpResponse); } - CookieListener cookieListener = request.getCookieListener(); - if (cookieListener != null) { - for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { - Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); - cookieListener.onCookie(cookie); + try { + Request retryRequest = retry(request, fullHttpResponse); + if (retryRequest != null) { + // retry transport, wait for completion + client.retry(this, retryRequest); + } else { + Request continueRequest = continuation(request, fullHttpResponse); + if (continueRequest != null) { + // continue with new transport, synchronous call here, wait for completion + client.continuation(this, continueRequest); + } } + } catch (URLSyntaxException | IOException e) { + logger.log(Level.WARNING, e.getMessage(), e); } } + promise.complete(true); } @Override @@ -134,16 +115,25 @@ public class Http2Transport extends BaseTransport implements Transport { } @Override - public void awaitResponse(Integer streamId) { + public void awaitResponse(Integer streamId) throws IOException { if (streamId == null) { return; } + if (throwable != null) { + return; + } CompletableFuture promise = streamidPromiseMap.get(streamId); if (promise != null) { try { - promise.get(client.getTimeout(), TimeUnit.MILLISECONDS); + long millis = client.getClientConfig().getReadTimeoutMillis(); + Request request = fromStreamId(streamId); + if (request != null && request.getTimeoutInMillis() > 0) { + millis = request.getTimeoutInMillis(); + } + promise.get(millis, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.log(Level.WARNING, "streamId=" + streamId + " " + e.getMessage(), e); + this.throwable = e; + throw new IOException(e); } finally { streamidPromiseMap.remove(streamId); } @@ -153,7 +143,14 @@ public class Http2Transport extends BaseTransport implements Transport { @Override public Transport get() { for (Integer streamId : streamidPromiseMap.keySet()) { - awaitResponse(streamId); + try { + awaitResponse(streamId); + } catch (IOException e) { + notifyRequest(streamId, e); + } + } + if (throwable != null) { + streamidPromiseMap.clear(); } return this; } @@ -165,10 +162,32 @@ public class Http2Transport extends BaseTransport implements Transport { } } + /** + * The underlying network layer failed, not possible to know the request. + * So we fail all (open) promises. + * @param throwable the exception + */ @Override public void fail(Throwable throwable) { + // fail fast, do not fail more than once + if (this.throwable != null) { + return; + } + this.throwable = throwable; for (CompletableFuture promise : streamidPromiseMap.values()) { promise.completeExceptionally(throwable); } } + + /** + * Try to notify request about failure. + * @param streamId stream ID + * @param throwable the exception + */ + private void notifyRequest(Integer streamId, Throwable throwable) { + Request request = fromStreamId(streamId); + if (request != null && request.getCompletableFuture() != null) { + request.getCompletableFuture().completeExceptionally(throwable); + } + } } diff --git a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java index 1356be4..94b0dd2 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java @@ -2,18 +2,12 @@ package org.xbib.netty.http.client.transport; import io.netty.channel.Channel; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.cookie.ClientCookieDecoder; -import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import org.xbib.net.URLSyntaxException; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; -import org.xbib.netty.http.client.listener.CookieListener; -import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpResponseListener; import java.io.IOException; @@ -27,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; -public class HttpTransport extends BaseTransport implements Transport { +public class HttpTransport extends BaseTransport { private static final Logger logger = Logger.getLogger(HttpTransport.class.getName()); @@ -43,7 +37,7 @@ public class HttpTransport extends BaseTransport implements Transport { @Override public Integer nextStream() { - Integer streamId = sequentialCounter.getAndAdd(1); + Integer streamId = sequentialCounter.getAndIncrement(); if (streamId == Integer.MIN_VALUE) { // reset if overflow, Java wraps atomic integers to Integer.MIN_VALUE sequentialCounter.set(0); @@ -71,9 +65,18 @@ public class HttpTransport extends BaseTransport implements Transport { } } try { - request = continuation(null, fullHttpResponse); - if (request != null) { - client.continuation(this, request); + Request retryRequest = retry(request, fullHttpResponse); + if (retryRequest != null) { + // retry transport, wait for completion + client.retry(this, retryRequest); + retryRequest.close(); + } else { + Request continueRequest = continuation(request, fullHttpResponse); + if (continueRequest != null) { + // continue with new transport, synchronous call here, wait for completion + client.continuation(this, continueRequest); + continueRequest.close(); + } } } catch (URLSyntaxException | IOException e) { logger.log(Level.WARNING, e.getMessage(), e); @@ -86,40 +89,25 @@ public class HttpTransport extends BaseTransport implements Transport { } } - @Override - public void headersReceived(Integer streamId, HttpHeaders httpHeaders) { - Request request = fromStreamId(streamId); - if (request != null) { - HttpHeadersListener httpHeadersListener = request.getHeadersListener(); - if (httpHeadersListener != null) { - httpHeadersListener.onHeaders(httpHeaders); - } - for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { - Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); - addCookie(cookie); - CookieListener cookieListener = request.getCookieListener(); - if (cookieListener != null) { - cookieListener.onCookie(cookie); - } - } - } - } - @Override public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) { } @Override - public void awaitResponse(Integer streamId) { + public void awaitResponse(Integer streamId) throws IOException { if (streamId == null) { return; } + if (throwable != null) { + return; + } CompletableFuture promise = sequentialPromiseMap.get(streamId); if (promise != null) { try { - promise.get(client.getTimeout(), TimeUnit.MILLISECONDS); + promise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.log(Level.WARNING, "streamId=" + streamId + " " + e.getMessage(), e); + this.throwable = e; + throw new IOException(e); } finally { sequentialPromiseMap.remove(streamId); } @@ -128,8 +116,15 @@ public class HttpTransport extends BaseTransport implements Transport { @Override public Transport get() { - for (Integer streamId : sequentialPromiseMap.keySet()) { - awaitResponse(streamId); + try { + for (Integer streamId : sequentialPromiseMap.keySet()) { + awaitResponse(streamId); + client.releaseChannel(channel); + } + } catch (IOException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } finally { + sequentialPromiseMap.clear(); } return this; } @@ -143,9 +138,9 @@ public class HttpTransport extends BaseTransport implements Transport { @Override public void fail(Throwable throwable) { + this.throwable = throwable; for (CompletableFuture promise : sequentialPromiseMap.values()) { promise.completeExceptionally(throwable); } } - } diff --git a/src/main/java/org/xbib/netty/http/client/transport/Transport.java b/src/main/java/org/xbib/netty/http/client/transport/Transport.java index 86aec30..0fca5c1 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/Transport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/Transport.java @@ -7,7 +7,6 @@ import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import io.netty.util.AttributeKey; -import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; import java.io.IOException; @@ -19,8 +18,6 @@ public interface Transport { AttributeKey TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport"); - HttpAddress httpAddress(); - Transport execute(Request request) throws IOException; CompletableFuture execute(Request request, Function supplier) throws IOException; @@ -41,7 +38,7 @@ public interface Transport { void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers); - void awaitResponse(Integer streamId); + void awaitResponse(Integer streamId) throws IOException; Transport get(); @@ -49,5 +46,9 @@ public interface Transport { void fail(Throwable throwable); - void close(); + boolean isFailed(); + + Throwable getFailure(); + + void close() throws IOException; } diff --git a/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java b/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java index 5ffca22..9744621 100644 --- a/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java +++ b/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java @@ -395,10 +395,8 @@ public class NetworkUtils { if (predicate.test(networkInterface)) { networkInterfaces.add(networkInterface); Enumeration subInterfaces = networkInterface.getSubInterfaces(); - if (subInterfaces.hasMoreElements()) { - while (subInterfaces.hasMoreElements()) { - networkInterfaces.add(subInterfaces.nextElement()); - } + while (subInterfaces.hasMoreElements()) { + networkInterfaces.add(subInterfaces.nextElement()); } } } diff --git a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java index 8aa5261..e198e36 100644 --- a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java @@ -26,7 +26,7 @@ public class CompletableFutureTest { final Function httpResponseStringFunction = response -> response.content().toString(StandardCharsets.UTF_8); Request request = Request.get() - .url("http://alkmene.hbz-nrw.de/repository/org/xbib/content/2.0.0-SNAPSHOT/maven-metadata-local.xml") + .url("https://repo.maven.apache.org/maven2/org/xbib/netty-http-client/maven-metadata.xml.sha1") .build(); CompletableFuture completableFuture = client.execute(request, httpResponseStringFunction) .exceptionally(Throwable::getMessage) diff --git a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java index 3e974dc..9ccbe50 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java @@ -17,15 +17,14 @@ public class ConscryptTest extends LoggingBase { @Test public void testConscrypt() throws IOException { Client client = Client.builder() - .enableDebug() .setJdkSslProvider() .setSslContextProvider(Conscrypt.newProvider()) .build(); logger.log(Level.INFO, client.getClientConfig().toString()); try { Request request = Request.get() - .url("https://fl-test.hbz-nrw.de") - .setVersion("HTTP/2.0") + .url("https://xbib.org") + .setVersion("HTTP/1.1") .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); diff --git a/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java b/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java index d0970b2..5f2e788 100644 --- a/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java @@ -26,7 +26,7 @@ public class CookieSetterHttpBinTest extends LoggingBase { * } * } * - * @throws Exception + * @throws IOException if test fails */ @Test public void testHttpBinCookies() throws IOException { diff --git a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java index 7e53223..b4f6f97 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java @@ -83,7 +83,7 @@ public class ElasticsearchTest extends LoggingBase { .build() .setResponseListener(fullHttpResponse -> logger.log(Level.FINE, "status = " + fullHttpResponse.status() + - " counter = " + count.incrementAndGet() + + " counter = " + count.getAndIncrement() + " response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8))); } diff --git a/src/test/java/org/xbib/netty/http/client/test/Http1Test.java b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java index f01e312..09f51d4 100644 --- a/src/test/java/org/xbib/netty/http/client/test/Http1Test.java +++ b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java @@ -1,24 +1,34 @@ package org.xbib.netty.http.client.test; import io.netty.handler.codec.http.HttpMethod; +import org.junit.After; +import org.junit.Ignore; import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; -public class Http1Test { +public class Http1Test extends LoggingBase { private static final Logger logger = Logger.getLogger(Http1Test.class.getName()); + @After + public void checkThreads() { + Set threadSet = Thread.getAllStackTraces().keySet(); + logger.log(Level.INFO, "threads = " + threadSet.size() ); + threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString())); + } + @Test public void testHttp1() throws Exception { Client client = new Client(); try { - Request request = Request.get().url("http://fl.hbz-nrw.de").build() + Request request = Request.get().url("http://xbib.org").build() .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + msg.headers().entries() + msg.content().toString(StandardCharsets.UTF_8) + @@ -30,26 +40,29 @@ public class Http1Test { } @Test - public void testHttp1ParallelRequests() throws IOException { - Client client = new Client(); + @Ignore + public void testParallelRequests() throws IOException { + Client client = Client.builder().enableDebug().build(); try { Request request1 = Request.builder(HttpMethod.GET) - .url("http://fl.hbz-nrw.de").setVersion("HTTP/1.1") + .url("http://xbib.org").setVersion("HTTP/1.1") .build() .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + msg.headers().entries() + //msg.content().toString(StandardCharsets.UTF_8) + " status=" + msg.status().code())); Request request2 = Request.builder(HttpMethod.GET) - .url("http://fl.hbz-nrw.de/app/fl/").setVersion("HTTP/1.1") + .url("http://xbib.org").setVersion("HTTP/1.1") .build() .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + msg.headers().entries() + //msg.content().toString(StandardCharsets.UTF_8) + " status=" + msg.status().code())); - client.execute(request1); - client.execute(request2); + for (int i = 0; i < 10; i++) { + client.execute(request1); + client.execute(request2); + } } finally { client.shutdownGracefully(); @@ -57,7 +70,8 @@ public class Http1Test { } @Test - public void testTwoTransports() throws Exception { + @Ignore + public void testSequentialRequests() throws Exception { Client client = Client.builder().enableDebug().build(); try { Request request1 = Request.get().url("http://xbib.org").build() diff --git a/src/test/java/org/xbib/netty/http/client/test/Http2Test.java b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java index 1c2d2ac..94c4cad 100644 --- a/src/test/java/org/xbib/netty/http/client/test/Http2Test.java +++ b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java @@ -11,25 +11,39 @@ import java.nio.charset.StandardCharsets; import java.util.logging.Level; import java.util.logging.Logger; -public class Http2Test { +public class Http2Test extends LoggingBase { private static final Logger logger = Logger.getLogger(Http2Test.class.getName()); /** + * Problems with akamai: + * + * 2018-03-07 16:02:52.385 FEIN [client] io.netty.handler.codec.http2.Http2FrameLogger logRstStream + * [id: 0x57cc65bb, L:/10.1.1.94:52834 - R:http2.akamai.com/104.94.191.203:443] INBOUND RST_STREAM: streamId=2 errorCode=8 + * 2018-03-07 16:02:52.385 FEIN [client] io.netty.handler.codec.http2.Http2FrameLogger logGoAway + * [id: 0x57cc65bb, L:/10.1.1.94:52834 - R:http2.akamai.com/104.94.191.203:443] OUTBOUND GO_AWAY: lastStreamId=2 errorCode=0 length=0 bytes= + * + * demo/h2_demo_frame.html sends no content, only a push promise, and does not continue + * + * @throws IOException */ @Test + @Ignore public void testAkamai() throws IOException { - Client client = Client.builder().enableDebug().build(); + Client client = Client.builder() + .enableDebug() + .addServerNameForIdentification("http2.akamai.com") + .build(); try { Request request = Request.get() .url("https://http2.akamai.com/demo/h2_demo_frame.html") //.url("https://http2.akamai.com/") .setVersion("HTTP/2.0") .build() - .setResponseListener(fullHttpResponse -> { - String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); - logger.log(Level.INFO, "status = " + fullHttpResponse.status() - + " response body = " + response); + .setResponseListener(msg -> { + String response = msg.content().toString(StandardCharsets.UTF_8); + logger.log(Level.INFO, "status = " + msg.status() + + msg.headers().entries() + " " + response); }); client.execute(request).get(); } finally { @@ -55,17 +69,18 @@ public class Http2Test { @Test public void testHttp2PushIO() throws IOException { - //String url = "https://webtide.com"; String url = "https://http2-push.io"; - // TODO register push announces into promises in order to wait for them all. - Client client = Client.builder().enableDebug().build(); + Client client = Client.builder() + .enableDebug() + .addServerNameForIdentification("http2-push.io") + .build(); try { Request request = Request.builder(HttpMethod.GET) .url(url).setVersion("HTTP/2.0") .build() .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + msg.headers().entries() + - msg.content().toString(StandardCharsets.UTF_8) + + //msg.content().toString(StandardCharsets.UTF_8) + " status=" + msg.status().code())); client.execute(request).get(); @@ -75,7 +90,7 @@ public class Http2Test { } @Test - public void testWebtideTwoRequestsOnSameConnection() { + public void testWebtideTwoRequestsOnSameConnection() throws IOException { Client client = new Client(); try { Request request1 = Request.builder(HttpMethod.GET) @@ -95,8 +110,6 @@ public class Http2Test { " status=" + msg.status().code())); client.execute(request1).execute(request2); - } catch (IOException e) { - // } finally { client.shutdownGracefully(); } diff --git a/src/test/java/org/xbib/netty/http/client/test/LeakTest.java b/src/test/java/org/xbib/netty/http/client/test/LeakTest.java new file mode 100644 index 0000000..b8361d0 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/LeakTest.java @@ -0,0 +1,28 @@ +package org.xbib.netty.http.client.test; + +import org.junit.After; +import org.junit.Test; +import org.xbib.netty.http.client.Client; + +import java.io.IOException; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class LeakTest { + + private static final Logger logger = Logger.getLogger(LeakTest.class.getName()); + + @After + public void checkThreads() { + Set threadSet = Thread.getAllStackTraces().keySet(); + logger.log(Level.INFO, "threads = " + threadSet.size() ); + threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString())); + } + + @Test + public void testForLeaks() throws IOException, InterruptedException { + Client client = new Client(); + client.shutdownGracefully(); + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java b/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java index ad57a4e..e734d15 100644 --- a/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java +++ b/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java @@ -17,7 +17,7 @@ public class LoggingBase { Handler handler = new ConsoleHandler(); handler.setFormatter(new SimpleFormatter()); rootLogger.addHandler(handler); - rootLogger.setLevel(Level.INFO); + rootLogger.setLevel(Level.ALL); for (Handler h : rootLogger.getHandlers()) { handler.setFormatter(new SimpleFormatter()); h.setLevel(Level.ALL); diff --git a/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java b/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java new file mode 100644 index 0000000..1ef9c34 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java @@ -0,0 +1,65 @@ +package org.xbib.netty.http.client.test; + +import org.junit.Test; +import org.xbib.netty.http.client.Client; +import org.xbib.netty.http.client.HttpAddress; +import org.xbib.netty.http.client.Request; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class PooledClientTest extends LoggingBase { + + private static final Logger logger = Logger.getLogger(""); + + @Test + public void testPooledClientWithSingleNode() throws IOException { + int loop = 10; + HttpAddress httpAddress = HttpAddress.http1("xbib.org", 80); + Client client = Client.builder() + .addPoolNode(httpAddress) + .setPoolSecure(httpAddress.isSecure()) + .setPoolNodeConnectionLimit(16) + .build(); + AtomicInteger count = new AtomicInteger(); + try { + int threads = 16; + ExecutorService executorService = Executors.newFixedThreadPool(threads); + for (int n = 0; n < threads; n++) { + executorService.submit(() -> { + try { + logger.log(Level.INFO, "starting " + Thread.currentThread()); + for (int i = 0; i < loop; i++) { + Request request = Request.get() + .url("http://xbib.org/repository/") + .setVersion("HTTP/1.1") + .build() + .setResponseListener(fullHttpResponse -> { + String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); + //logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); + }); + client.pooledExecute(request).get(); + count.getAndIncrement(); + } + logger.log(Level.INFO, "done " + Thread.currentThread()); + } catch (IOException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } finally { + client.shutdownGracefully(); + } + logger.log(Level.INFO, "count = " + count.get()); + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/XbibTest.java b/src/test/java/org/xbib/netty/http/client/test/XbibTest.java index 4584329..cb011cb 100644 --- a/src/test/java/org/xbib/netty/http/client/test/XbibTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/XbibTest.java @@ -88,11 +88,11 @@ public class XbibTest extends LoggingBase { @Test public void testXbibOrgWithVeryShortReadTimeout() throws IOException { Client httpClient = Client.builder() - .setReadTimeoutMillis(50) .build(); try { httpClient.execute(Request.get() .url("http://xbib.org") + .setTimeoutInMillis(10) .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java index 8c04cf4..658b8e1 100644 --- a/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java @@ -13,13 +13,14 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpVersion; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.pool.Pool; -import org.xbib.netty.http.client.pool.SimpleChannelPool; +import org.xbib.netty.http.client.pool.BoundedChannelPool; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -34,6 +35,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.logging.Level; import java.util.logging.Logger; +@Ignore public class EpollTest { private static final Logger logger = Logger.getLogger(EpollTest.class.getName()); @@ -74,7 +76,8 @@ public class EpollTest { .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true); - channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0); + channelPool = new BoundedChannelPool<>(semaphore, HttpVersion.HTTP_1_1,false, + NODES, bootstrap, null, 0); channelPool.prepare(CONCURRENCY); } @@ -85,7 +88,6 @@ public class EpollTest { mockEpollServer.close(); } - @Ignore @Test public void testPoolEpoll() throws Exception { LongAdder longAdder = new LongAdder(); diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java index e65e767..5be5cbc 100644 --- a/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java @@ -12,12 +12,13 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpVersion; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.pool.Pool; -import org.xbib.netty.http.client.pool.SimpleChannelPool; +import org.xbib.netty.http.client.pool.BoundedChannelPool; import java.util.Collections; import java.util.List; @@ -72,7 +73,8 @@ public class NioTest { .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true); - channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0); + channelPool = new BoundedChannelPool<>(semaphore, HttpVersion.HTTP_1_1,false, + NODES, bootstrap, null, 0); channelPool.prepare(CONCURRENCY); } diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java similarity index 91% rename from src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java rename to src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java index 9351230..dcd6738 100644 --- a/src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java @@ -2,6 +2,14 @@ package org.xbib.netty.http.client.test.pool; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.AttributeKey; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.xbib.netty.http.client.HttpAddress; +import org.xbib.netty.http.client.pool.BoundedChannelPool; +import org.xbib.netty.http.client.pool.Pool; import java.util.ArrayList; import java.util.Arrays; @@ -17,21 +25,13 @@ import java.util.concurrent.atomic.LongAdder; import java.util.logging.Level; import java.util.logging.Logger; -import io.netty.util.AttributeKey; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.xbib.netty.http.client.HttpAddress; -import org.xbib.netty.http.client.pool.Pool; -import org.xbib.netty.http.client.pool.SimpleChannelPool; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) -public class SimplePoolTest { +public class PoolTest { - private static final Logger logger = Logger.getLogger(SimplePoolTest.class.getName()); + private static final Logger logger = Logger.getLogger(PoolTest.class.getName()); private static final int TEST_STEP_TIME_SECONDS = 50; @@ -51,14 +51,14 @@ public class SimplePoolTest { }); } - public SimplePoolTest(int concurrencyLevel, int nodeCount) { + public PoolTest(int concurrencyLevel, int nodeCount) { this.nodeCount = nodeCount; List nodes = new ArrayList<>(); for (int i = 0; i < nodeCount; i ++) { nodes.add(HttpAddress.http1("localhost" + i)); } - try (Pool pool = new SimpleChannelPool<>(new Semaphore(concurrencyLevel), nodes, new Bootstrap(), - null, 0)) { + try (Pool pool = new BoundedChannelPool<>(new Semaphore(concurrencyLevel), HttpVersion.HTTP_1_1, false, + nodes, new Bootstrap(), null, 0)) { int n = Runtime.getRuntime().availableProcessors(); ExecutorService executorService = Executors.newFixedThreadPool(n); for(int i = 0; i < n; i ++) { diff --git a/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java b/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java index 3b8e4c4..17621d8 100644 --- a/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java @@ -3,7 +3,6 @@ package org.xbib.netty.http.client.test.retry; import org.junit.Test; import org.xbib.netty.http.client.retry.BackOff; import org.xbib.netty.http.client.retry.ExponentialBackOff; -import org.xbib.netty.http.client.retry.NanoClock; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -136,7 +135,7 @@ public class ExponentialBackOffTest { assertEquals(testMaxInterval, backOffPolicy.getCurrentIntervalMillis()); } - static class MyNanoClock implements NanoClock { + static class MyNanoClock implements ExponentialBackOff.NanoClock { private int i = 0; private long startSeconds; diff --git a/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java index 9fa01f6..e72c4db 100644 --- a/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java +++ b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java @@ -23,11 +23,13 @@ public class MockBackOff implements BackOff { /** Number of tries so far. */ private int numTries; - public void reset() throws IOException { + @Override + public void reset() { numTries = 0; } - public long nextBackOffMillis() throws IOException { + @Override + public long nextBackOffMillis() { if (numTries >= maxTries || backOffMillis == STOP) { return STOP; } diff --git a/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java b/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java index 5065847..fea4833 100644 --- a/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java +++ b/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java @@ -1,11 +1,9 @@ package org.xbib.netty.http.client.test.simple; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; @@ -20,11 +18,8 @@ import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http2.Http2Settings; -import io.netty.handler.codec.http2.HttpConversionUtil; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; import io.netty.util.AttributeKey; +import org.junit.After; import org.junit.Test; import java.io.IOException; @@ -32,6 +27,7 @@ import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -46,17 +42,26 @@ public class SimpleHttp1Test { private static final Logger logger = Logger.getLogger(SimpleHttp1Test.class.getName()); + @After + public void checkThreads() { + Set threadSet = Thread.getAllStackTraces().keySet(); + logger.log(Level.INFO, "threads = " + threadSet.size() ); + threadSet.forEach( thread -> { + if (thread.getName().equals("ObjectCleanerThread")) { + logger.log(Level.INFO, thread.toString()); + } + }); + } + @Test public void testHttp1() throws Exception { Client client = new Client(); try { - HttpTransport transport = client.newTransport("fl.hbz-nrw.de", 80); + HttpTransport transport = client.newTransport("xbib.org", 80); transport.onResponse(string -> logger.log(Level.INFO, "got messsage: " + string)); transport.connect(); - transport.awaitSettings(); sendRequest(transport); - transport.awaitResponses(); - transport.close(); + transport.awaitResponse(); } finally { client.shutdown(); } @@ -67,20 +72,18 @@ public class SimpleHttp1Test { if (channel == null) { return; } - Integer streamId = transport.nextStream(); String host = transport.inetSocketAddress().getHostString(); int port = transport.inetSocketAddress().getPort(); - String uri = "https://" + host + ":" + port; + String uri = "http://" + host + ":" + port; FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); request.headers().add(HttpHeaderNames.HOST, host + ":" + port); request.headers().add(HttpHeaderNames.USER_AGENT, "Java"); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.DEFLATE); - if (streamId != null) { - request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId)); - } logger.log(Level.INFO, () -> "writing request = " + request); - channel.writeAndFlush(request); + if (channel.isWritable()) { + channel.writeAndFlush(request); + } } private AttributeKey TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport"); @@ -115,16 +118,14 @@ public class SimpleHttp1Test { return bootstrap; } - Initializer initializer() { - return initializer; - } - - HttpResponseHandler responseHandler() { - return httpResponseHandler; - } - void shutdown() { + close(); eventLoopGroup.shutdownGracefully(); + try { + eventLoopGroup.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } } HttpTransport newTransport(String host, int port) { @@ -133,18 +134,11 @@ public class SimpleHttp1Test { return transport; } - List transports() { - return transports; - } - - void close(HttpTransport transport) { - transports.remove(transport); - } - - void close() { + synchronized void close() { for (HttpTransport transport : transports) { transport.close(); } + transports.clear(); } } @@ -165,10 +159,6 @@ public class SimpleHttp1Test { this.inetSocketAddress = inetSocketAddress; } - Client client() { - return client; - } - InetSocketAddress inetSocketAddress() { return inetSocketAddress; } @@ -176,52 +166,33 @@ public class SimpleHttp1Test { void connect() throws InterruptedException { channel = client.bootstrap().connect(inetSocketAddress).sync().await().channel(); channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this); + promise = new CompletableFuture<>(); } Channel channel() { return channel; } - Integer nextStream() { - promise = new CompletableFuture<>(); - return null; - } - void onResponse(ResponseWriter responseWriter) { this.responseWriter = responseWriter; } - void settingsReceived(Channel channel, Http2Settings http2Settings) { - } - - void awaitSettings() { - } - - void responseReceived(Integer streamId, String message) { - if (promise == null) { - logger.log(Level.WARNING, "message received for unknown stream id " + streamId); - } else { - if (responseWriter != null) { - responseWriter.write(message); - } + void responseReceived(FullHttpResponse msg) { + if (responseWriter != null) { + responseWriter.write(msg.content().toString(StandardCharsets.UTF_8)); } } - void awaitResponse(Integer streamId) { + + void awaitResponse() { if (promise != null) { try { - logger.log(Level.INFO, "waiting for response"); promise.get(5, TimeUnit.SECONDS); - logger.log(Level.INFO, "response received"); } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.log(Level.WARNING, e.getMessage(), e); } } } - void awaitResponses() { - awaitResponse(null); - } - void complete() { if (promise != null) { promise.complete(true); @@ -238,7 +209,6 @@ public class SimpleHttp1Test { if (channel != null) { channel.close(); } - client.close(this); } } @@ -252,7 +222,6 @@ public class SimpleHttp1Test { @Override protected void initChannel(SocketChannel ch) { - ch.pipeline().addLast(new TrafficLoggingHandler()); ch.pipeline().addLast(new HttpClientCodec()); ch.pipeline().addLast(new HttpObjectAggregator(1048576)); ch.pipeline().addLast(httpResponseHandler); @@ -265,7 +234,7 @@ public class SimpleHttp1Test { protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { HttpTransport transport = ctx.channel().attr(TRANSPORT_ATTRIBUTE_KEY).get(); if (msg.content().isReadable()) { - transport.responseReceived(null, msg.content().toString(StandardCharsets.UTF_8)); + transport.responseReceived(msg); } } @@ -290,35 +259,4 @@ public class SimpleHttp1Test { ctx.channel().close(); } } - - class TrafficLoggingHandler extends LoggingHandler { - - TrafficLoggingHandler() { - super("client", LogLevel.INFO); - } - - @Override - public void channelRegistered(ChannelHandlerContext ctx) { - ctx.fireChannelRegistered(); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) { - ctx.fireChannelUnregistered(); - } - - @Override - public void flush(ChannelHandlerContext ctx) { - ctx.flush(); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) { - ctx.write(msg, promise); - } else { - super.write(ctx, msg, promise); - } - } - } }