From 8593996f2bfb19a4b9e2f2f5d819b9409da0fda3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jo=CC=88rg=20Prante?=
Date: Thu, 8 Mar 2018 15:57:17 +0100
Subject: [PATCH] add pool implementation, back-off for retry, cleaning up
---
build.gradle | 115 ++++++-
gradle.properties | 4 +-
gradle/ext.gradle | 9 -
gradle/publish.gradle | 72 ----
gradle/sonarqube.gradle | 39 ---
.../org/xbib/netty/http/client/Client.java | 312 +++++++++++++++---
.../xbib/netty/http/client/ClientBuilder.java | 58 +++-
.../xbib/netty/http/client/ClientConfig.java | 205 +++++++++---
.../org/xbib/netty/http/client/Request.java | 57 +++-
.../netty/http/client/RequestBuilder.java | 39 ++-
.../http/client/handler/UserEventLogger.java | 30 --
.../handler/http1/HttpChannelInitializer.java | 78 ++---
.../{ => http1}/TrafficLoggingHandler.java | 6 +-
.../http2/Http2ChannelInitializer.java | 102 +++---
...annelPool.java => BoundedChannelPool.java} | 233 +++++++------
.../netty/http/client/rest/RestClient.java | 8 +-
.../xbib/netty/http/client/retry/BackOff.java | 4 +-
.../http/client/retry/ExponentialBackOff.java | 77 ++++-
.../netty/http/client/retry/NanoClock.java | 44 ---
.../http/client/transport/BaseTransport.java | 202 +++++++++---
.../http/client/transport/Http2Transport.java | 115 ++++---
.../http/client/transport/HttpTransport.java | 67 ++--
.../http/client/transport/Transport.java | 11 +-
.../netty/http/client/util/NetworkUtils.java | 6 +-
.../client/test/CompletableFutureTest.java | 2 +-
.../netty/http/client/test/ConscryptTest.java | 5 +-
.../client/test/CookieSetterHttpBinTest.java | 2 +-
.../http/client/test/ElasticsearchTest.java | 2 +-
.../netty/http/client/test/Http1Test.java | 32 +-
.../netty/http/client/test/Http2Test.java | 39 ++-
.../xbib/netty/http/client/test/LeakTest.java | 28 ++
.../netty/http/client/test/LoggingBase.java | 2 +-
.../http/client/test/PooledClientTest.java | 65 ++++
.../xbib/netty/http/client/test/XbibTest.java | 2 +-
.../http/client/test/pool/EpollTest.java | 8 +-
.../netty/http/client/test/pool/NioTest.java | 6 +-
.../{SimplePoolTest.java => PoolTest.java} | 26 +-
.../test/retry/ExponentialBackOffTest.java | 3 +-
.../http/client/test/retry/MockBackOff.java | 6 +-
.../client/test/simple/SimpleHttp1Test.java | 130 ++------
40 files changed, 1404 insertions(+), 847 deletions(-)
delete mode 100644 src/main/java/org/xbib/netty/http/client/handler/UserEventLogger.java
rename src/main/java/org/xbib/netty/http/client/handler/{ => http1}/TrafficLoggingHandler.java (88%)
rename src/main/java/org/xbib/netty/http/client/pool/{SimpleChannelPool.java => BoundedChannelPool.java} (60%)
delete mode 100644 src/main/java/org/xbib/netty/http/client/retry/NanoClock.java
create mode 100644 src/test/java/org/xbib/netty/http/client/test/LeakTest.java
create mode 100644 src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java
rename src/test/java/org/xbib/netty/http/client/test/pool/{SimplePoolTest.java => PoolTest.java} (91%)
diff --git a/build.gradle b/build.gradle
index 1e29b59..3779d54 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2,6 +2,7 @@ import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
plugins {
+ id "com.github.spotbugs" version "1.6.1"
id "org.sonarqube" version "2.6.1"
id "io.codearte.nexus-staging" version "0.11.0"
id "org.xbib.gradle.plugin.asciidoctor" version "1.6.0.0"
@@ -23,10 +24,10 @@ printf "Date: %s\nHost: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGradle: %s Groovy: %
apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'signing'
+apply plugin: "com.github.spotbugs"
apply plugin: "io.codearte.nexus-staging"
apply plugin: 'org.xbib.gradle.plugin.asciidoctor'
-
configurations {
alpnagent
asciidoclet
@@ -37,7 +38,7 @@ dependencies {
compile "org.xbib:net-url:${project.property('xbib-net-url.version')}"
compile "io.netty:netty-codec-http2:${project.property('netty.version')}"
compile "io.netty:netty-handler-proxy:${project.property('netty.version')}"
- testCompile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}"
+ compile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}"
testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}"
testCompile "junit:junit:${project.property('junit.version')}"
@@ -66,7 +67,7 @@ test {
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
}
testLogging {
- showStandardStreams = false
+ showStandardStreams = true
exceptionFormat = 'full'
}
}
@@ -121,5 +122,109 @@ if (project.hasProperty('signing.keyId')) {
}
}
-apply from: 'gradle/ext.gradle'
-apply from: 'gradle/publish.gradle'
+
+spotbugs {
+ effort = "max"
+ reportLevel = "low"
+ //includeFilter = file("findbugs-exclude.xml")
+}
+
+tasks.withType(com.github.spotbugs.SpotBugsTask) {
+ ignoreFailures = true
+ reports {
+ xml.enabled = false
+ html.enabled = true
+ }
+}
+
+sonarqube {
+ properties {
+ property "sonar.projectName", "${project.group} ${project.name}"
+ property "sonar.sourceEncoding", "UTF-8"
+ property "sonar.tests", "src/test/java"
+ property "sonar.scm.provider", "git"
+ property "sonar.junit.reportsPath", "build/test-results/test/"
+ }
+}
+
+ext {
+ user = 'jprante'
+ name = 'netty-http-client'
+ description = 'A java client for Elasticsearch'
+ scmUrl = 'https://github.com/' + user + '/' + name
+ scmConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
+ scmDeveloperConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
+}
+
+
+task xbibUpload(type: Upload) {
+ group = 'publish'
+ configuration = configurations.archives
+ uploadDescriptor = true
+ repositories {
+ if (project.hasProperty("xbibUsername")) {
+ mavenDeployer {
+ configuration = configurations.wagon
+ repository(url: 'sftp://xbib.org/repository') {
+ authentication(userName: xbibUsername, privateKey: xbibPrivateKey)
+ }
+ }
+ }
+ }
+}
+
+task sonaTypeUpload(type: Upload) {
+ group = 'publish'
+ configuration = configurations.archives
+ uploadDescriptor = true
+ repositories {
+ if (project.hasProperty('ossrhUsername')) {
+ mavenDeployer {
+ beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
+ repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') {
+ authentication(userName: ossrhUsername, password: ossrhPassword)
+ }
+ snapshotRepository(url: 'https://oss.sonatype.org/content/repositories/snapshots') {
+ authentication(userName: ossrhUsername, password: ossrhPassword)
+ }
+ pom.project {
+ groupId project.group
+ artifactId project.name
+ version project.version
+ name project.name
+ description description
+ packaging 'jar'
+ inceptionYear '2012'
+ url scmUrl
+ organization {
+ name 'xbib'
+ url 'http://xbib.org'
+ }
+ developers {
+ developer {
+ id user
+ name 'Jörg Prante'
+ email 'joergprante@gmail.com'
+ url 'https://github.com/jprante'
+ }
+ }
+ scm {
+ url scmUrl
+ connection scmConnection
+ developerConnection scmDeveloperConnection
+ }
+ licenses {
+ license {
+ name 'The Apache License, Version 2.0'
+ url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+nexusStaging {
+ packageGroup = "org.xbib"
+}
diff --git a/gradle.properties b/gradle.properties
index 8bbb631..d0ddc09 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,8 +1,8 @@
group = org.xbib
name = netty-http-client
-version = 4.1.16.0
+version = 4.1.22.2
-netty.version = 4.1.16.Final
+netty.version = 4.1.22.Final
tcnative.version = 2.0.7.Final
conscrypt.version = 1.0.1
xbib-net-url.version = 1.1.0
diff --git a/gradle/ext.gradle b/gradle/ext.gradle
index dcb8f32..e69de29 100644
--- a/gradle/ext.gradle
+++ b/gradle/ext.gradle
@@ -1,9 +0,0 @@
-
-ext {
- user = 'jprante'
- name = 'netty-http-client'
- description = 'A java client for Elasticsearch'
- scmUrl = 'https://github.com/' + user + '/' + name
- scmConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
- scmDeveloperConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
-}
diff --git a/gradle/publish.gradle b/gradle/publish.gradle
index 0935c6f..e69de29 100644
--- a/gradle/publish.gradle
+++ b/gradle/publish.gradle
@@ -1,72 +0,0 @@
-
-task xbibUpload(type: Upload) {
- group = 'publish'
- configuration = configurations.archives
- uploadDescriptor = true
- repositories {
- if (project.hasProperty("xbibUsername")) {
- mavenDeployer {
- configuration = configurations.wagon
- repository(url: 'sftp://xbib.org/repository') {
- authentication(userName: xbibUsername, privateKey: xbibPrivateKey)
- }
- }
- }
- }
-}
-
-task sonaTypeUpload(type: Upload) {
- group = 'publish'
- configuration = configurations.archives
- uploadDescriptor = true
- repositories {
- if (project.hasProperty('ossrhUsername')) {
- mavenDeployer {
- beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
- repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') {
- authentication(userName: ossrhUsername, password: ossrhPassword)
- }
- snapshotRepository(url: 'https://oss.sonatype.org/content/repositories/snapshots') {
- authentication(userName: ossrhUsername, password: ossrhPassword)
- }
- pom.project {
- groupId project.group
- artifactId project.name
- version project.version
- name project.name
- description description
- packaging 'jar'
- inceptionYear '2012'
- url scmUrl
- organization {
- name 'xbib'
- url 'http://xbib.org'
- }
- developers {
- developer {
- id user
- name 'Jörg Prante'
- email 'joergprante@gmail.com'
- url 'https://github.com/jprante'
- }
- }
- scm {
- url scmUrl
- connection scmConnection
- developerConnection scmDeveloperConnection
- }
- licenses {
- license {
- name 'The Apache License, Version 2.0'
- url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
- }
- }
- }
- }
- }
- }
-}
-
-nexusStaging {
- packageGroup = "org.xbib"
-}
diff --git a/gradle/sonarqube.gradle b/gradle/sonarqube.gradle
index 3985a4f..e69de29 100644
--- a/gradle/sonarqube.gradle
+++ b/gradle/sonarqube.gradle
@@ -1,39 +0,0 @@
-tasks.withType(FindBugs) {
- ignoreFailures = true
- reports {
- xml.enabled = false
- html.enabled = true
- }
-}
-tasks.withType(Pmd) {
- ignoreFailures = true
- reports {
- xml.enabled = true
- html.enabled = true
- }
-}
-tasks.withType(Checkstyle) {
- ignoreFailures = true
- reports {
- xml.enabled = true
- html.enabled = true
- }
-}
-
-jacocoTestReport {
- reports {
- xml.enabled = true
- csv.enabled = false
- }
-}
-
-sonarqube {
- properties {
- property "sonar.projectName", "${project.group} ${project.name}"
- property "sonar.sourceEncoding", "UTF-8"
- property "sonar.tests", "src/test/java"
- property "sonar.scm.provider", "git"
- property "sonar.java.coveragePlugin", "jacoco"
- property "sonar.junit.reportsPath", "build/test-results/test/"
- }
-}
diff --git a/src/main/java/org/xbib/netty/http/client/Client.java b/src/main/java/org/xbib/netty/http/client/Client.java
index 8700e76..e5efefa 100644
--- a/src/main/java/org/xbib/netty/http/client/Client.java
+++ b/src/main/java/org/xbib/netty/http/client/Client.java
@@ -2,38 +2,54 @@ package org.xbib.netty.http.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
+import org.xbib.net.URL;
import org.xbib.netty.http.client.handler.http1.HttpChannelInitializer;
import org.xbib.netty.http.client.handler.http1.HttpResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2ChannelInitializer;
import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2SettingsHandler;
-import org.xbib.netty.http.client.pool.Pool;
-import org.xbib.netty.http.client.pool.SimpleChannelPool;
+import org.xbib.netty.http.client.pool.BoundedChannelPool;
import org.xbib.netty.http.client.transport.Http2Transport;
import org.xbib.netty.http.client.transport.HttpTransport;
import org.xbib.netty.http.client.transport.Transport;
import org.xbib.netty.http.client.util.NetworkUtils;
+import javax.net.ssl.SNIHostName;
+import javax.net.ssl.SNIServerName;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManagerFactory;
import java.io.IOException;
import java.security.KeyStoreException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -45,7 +61,22 @@ public final class Client {
private static final ThreadFactory httpClientThreadFactory = new HttpClientThreadFactory();
static {
- NetworkUtils.extendSystemProperties();
+ if (System.getProperty("xbib.netty.http.client.extendsystemproperties") != null) {
+ NetworkUtils.extendSystemProperties();
+ }
+ // change Netty defaults to safer ones, but still allow override from arg line
+ if (System.getProperty("io.netty.noUnsafe") == null) {
+ System.setProperty("io.netty.noUnsafe", Boolean.toString(true));
+ }
+ if (System.getProperty("io.netty.noKeySetOptimization") == null) {
+ System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true));
+ }
+ if (System.getProperty("io.netty.recycler.maxCapacity") == null) {
+ System.setProperty("io.netty.recycler.maxCapacity", Integer.toString(0));
+ }
+ if (System.getProperty("io.netty.leakDetection.level") == null) {
+ System.setProperty("io.netty.leakDetection.level", "advanced");
+ }
}
private final ClientConfig clientConfig;
@@ -68,7 +99,7 @@ public final class Client {
private TransportListener transportListener;
- private Pool pool;
+ private BoundedChannelPool pool;
public Client() {
this(new ClientConfig());
@@ -84,40 +115,50 @@ public final class Client {
this.clientConfig = clientConfig;
initializeTrustManagerFactory(clientConfig);
this.byteBufAllocator = byteBufAllocator != null ?
- byteBufAllocator : PooledByteBufAllocator.DEFAULT;
- this.eventLoopGroup = eventLoopGroup != null ?
- eventLoopGroup : new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory);
- this.socketChannelClass = socketChannelClass != null ?
- socketChannelClass : NioSocketChannel.class;
+ byteBufAllocator : ByteBufAllocator.DEFAULT;
+ this.eventLoopGroup = eventLoopGroup != null ? eventLoopGroup : clientConfig.isEpoll() ?
+ new EpollEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory) :
+ new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory);
+ this.socketChannelClass = socketChannelClass != null ? socketChannelClass : clientConfig.isEpoll() ?
+ EpollSocketChannel.class : NioSocketChannel.class;
this.bootstrap = new Bootstrap()
.group(this.eventLoopGroup)
.channel(this.socketChannelClass)
+ //.option(ChannelOption.ALLOCATOR, byteBufAllocator)
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNodelay())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isKeepAlive())
.option(ChannelOption.SO_REUSEADDR, clientConfig.isReuseAddr())
.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
- .option(ChannelOption.ALLOCATOR, byteBufAllocator);
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, clientConfig.getWriteBufferWaterMark());
this.httpResponseHandler = new HttpResponseHandler();
this.http2SettingsHandler = new Http2SettingsHandler();
this.http2ResponseHandler = new Http2ResponseHandler();
this.transports = new CopyOnWriteArrayList<>();
- List nodes = clientConfig.getNodes();
- if (nodes != null && !nodes.isEmpty()) {
- Integer limit = clientConfig.getNodeConnectionLimit();
- if (limit == null || limit > nodes.size()) {
- limit = nodes.size();
- }
- if (limit < 1) {
+ if (hasPooledConnections()) {
+ List nodes = clientConfig.getPoolNodes();
+ Integer limit = clientConfig.getPoolNodeConnectionLimit();
+ if (limit == null || limit < 1) {
limit = 1;
}
Semaphore semaphore = new Semaphore(limit);
- Integer retries = clientConfig.getRetriesPerNode();
+ Integer retries = clientConfig.getRetriesPerPoolNode();
if (retries == null || retries < 0) {
retries = 0;
}
- this.pool = new SimpleChannelPool<>(semaphore, nodes, bootstrap, null, retries);
+ ClientChannelPoolHandler clientChannelPoolHandler = new ClientChannelPoolHandler();
+ this.pool = new BoundedChannelPool<>(semaphore, clientConfig.getPoolVersion(),
+ clientConfig.isPoolSecure(), nodes, bootstrap, clientChannelPoolHandler, retries);
+ Integer nodeConnectionLimit = clientConfig.getPoolNodeConnectionLimit();
+ if (nodeConnectionLimit == null || nodeConnectionLimit == 0) {
+ nodeConnectionLimit = nodes.size();
+ }
+ try {
+ this.pool.prepare(nodeConnectionLimit);
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ }
}
}
@@ -133,52 +174,115 @@ public final class Client {
return byteBufAllocator;
}
+ public EventLoopGroup getEventLoopGroup() {
+ return eventLoopGroup;
+ }
+
public void setTransportListener(TransportListener transportListener) {
this.transportListener = transportListener;
}
+ public boolean hasPooledConnections() {
+ return !clientConfig.getPoolNodes().isEmpty();
+ }
+
+ public BoundedChannelPool getPool() {
+ return pool;
+ }
+
public void logDiagnostics(Level level) {
logger.log(level, () -> "OpenSSL available: " + OpenSsl.isAvailable() +
" OpenSSL ALPN support: " + OpenSsl.isAlpnSupported() +
- " Local host name: " + NetworkUtils.getLocalHostName("localhost"));
+ " Local host name: " + NetworkUtils.getLocalHostName("localhost") +
+ " event loop group: " + eventLoopGroup +
+ " socket: " + socketChannelClass.getName() +
+ " allocator: " + byteBufAllocator.getClass().getName());
logger.log(level, NetworkUtils::displayNetworkInterfaces);
}
- public int getTimeout() {
- return clientConfig.getReadTimeoutMillis();
+ public Transport newTransport() {
+ return newTransport(null);
+ }
+
+ public Transport newTransport(URL url, HttpVersion httpVersion) {
+ return newTransport(HttpAddress.of(url, httpVersion));
}
public Transport newTransport(HttpAddress httpAddress) {
- Transport transport;
- if (httpAddress.getVersion().majorVersion() < 2) {
- transport = new HttpTransport(this, httpAddress);
- } else {
- transport = new Http2Transport(this, httpAddress);
+ Transport transport = null;
+ if (httpAddress != null) {
+ if (httpAddress.getVersion().majorVersion() == 1) {
+ transport = new HttpTransport(this, httpAddress);
+ } else {
+ transport = new Http2Transport(this, httpAddress);
+ }
+ } else if (hasPooledConnections()) {
+ if (pool.getVersion().majorVersion() == 1) {
+ transport = new HttpTransport(this, null);
+ } else {
+ transport = new Http2Transport(this, null);
+ }
}
- if (transportListener != null) {
- transportListener.onOpen(transport);
+ if (transport != null) {
+ if (transportListener != null) {
+ transportListener.onOpen(transport);
+ }
+ transports.add(transport);
}
- transports.add(transport);
return transport;
}
- public Channel newChannel(HttpAddress httpAddress) throws InterruptedException {
- HttpVersion httpVersion = httpAddress.getVersion();
- ChannelInitializer initializer;
- Channel channel;
- if (httpVersion.majorVersion() < 2) {
- initializer = new HttpChannelInitializer(clientConfig, httpAddress, httpResponseHandler);
- channel = bootstrap.handler(initializer)
- .connect(httpAddress.getInetSocketAddress()).sync().await().channel();
+ public Channel newChannel(HttpAddress httpAddress) throws IOException {
+ Channel channel = null;
+ if (httpAddress != null) {
+ HttpVersion httpVersion = httpAddress.getVersion();
+ ChannelInitializer initializer;
+ SslHandler sslHandler = newSslHandler(clientConfig, byteBufAllocator, httpAddress);
+ if (httpVersion.majorVersion() == 1) {
+ initializer = new HttpChannelInitializer(clientConfig, httpAddress,
+ sslHandler, httpResponseHandler);
+ } else {
+ initializer = new Http2ChannelInitializer(clientConfig, httpAddress,
+ sslHandler, http2SettingsHandler, http2ResponseHandler);
+ }
+ try {
+ channel = bootstrap.handler(initializer)
+ .connect(httpAddress.getInetSocketAddress()).sync().await().channel();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
} else {
- initializer = new Http2ChannelInitializer(clientConfig, httpAddress,
- http2SettingsHandler, http2ResponseHandler);
- channel = bootstrap.handler(initializer)
- .connect(httpAddress.getInetSocketAddress()).sync().await().channel();
+ if (hasPooledConnections()) {
+ try {
+ channel = pool.acquire();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ throw new UnsupportedOperationException();
+ }
}
return channel;
}
+ public Channel newChannel() throws IOException {
+ return newChannel(null);
+ }
+
+ public void releaseChannel(Channel channel) throws IOException{
+ if (hasPooledConnections()) {
+ try {
+ pool.release(channel);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+ }
+
public Transport execute(Request request) throws IOException {
Transport transport = newTransport(HttpAddress.of(request));
transport.execute(request);
@@ -190,8 +294,14 @@ public final class Client {
return newTransport(HttpAddress.of(request)).execute(request, supplier);
}
+ public Transport pooledExecute(Request request) throws IOException {
+ Transport transport = newTransport();
+ transport.execute(request);
+ return transport;
+ }
+
/**
- * For following redirects by a chain of transports.
+ * For following redirects, construct a new transport.
* @param transport the previous transport
* @param request the new request for continuing the request.
*/
@@ -203,6 +313,13 @@ public final class Client {
close(nextTransport);
}
+ /**
+ * Retry request by following a back-off strategy.
+ *
+ * @param transport the transport to retry
+ * @param request the request to retry
+ * @throws IOException if retry failed
+ */
public void retry(Transport transport, Request request) throws IOException {
transport.execute(request);
transport.get();
@@ -213,7 +330,7 @@ public final class Client {
return newTransport(HttpAddress.of(request));
}
- public void close(Transport transport) {
+ public void close(Transport transport) throws IOException {
if (transportListener != null) {
transportListener.onClose(transport);
}
@@ -221,21 +338,29 @@ public final class Client {
transports.remove(transport);
}
- public void close() {
+ public void close() throws IOException {
for (Transport transport : transports) {
close(transport);
}
}
- public void shutdown() {
- eventLoopGroup.shutdownGracefully();
- }
-
- public void shutdownGracefully() {
+ public void shutdownGracefully() throws IOException {
+ if (hasPooledConnections()) {
+ pool.close();
+ }
close();
shutdown();
}
+ public void shutdown() {
+ eventLoopGroup.shutdownGracefully();
+ try {
+ eventLoopGroup.awaitTermination(10L, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
/**
* Initialize trust manager factory once per client lifecycle.
* @param clientConfig the client config
@@ -251,6 +376,60 @@ public final class Client {
}
}
+ private static SslHandler newSslHandler(ClientConfig clientConfig, ByteBufAllocator allocator, HttpAddress httpAddress) {
+ try {
+ SslContext sslContext = newSslContext(clientConfig);
+ SslHandler sslHandler = sslContext.newHandler(allocator);
+ SSLEngine engine = sslHandler.engine();
+ List serverNames = clientConfig.getServerNamesForIdentification();
+ if (serverNames.isEmpty()) {
+ serverNames = Collections.singletonList(httpAddress.getInetSocketAddress().getHostName());
+ }
+ SSLParameters params = engine.getSSLParameters();
+ params.setEndpointIdentificationAlgorithm("HTTPS");
+ List sniServerNames = new ArrayList<>();
+ for (String serverName : serverNames) {
+ sniServerNames.add(new SNIHostName(serverName));
+ }
+ params.setServerNames(sniServerNames);
+ engine.setSSLParameters(params);
+ switch (clientConfig.getClientAuthMode()) {
+ case NEED:
+ engine.setNeedClientAuth(true);
+ break;
+ case WANT:
+ engine.setWantClientAuth(true);
+ break;
+ default:
+ break;
+ }
+ return sslHandler;
+ } catch (SSLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private static SslContext newSslContext(ClientConfig clientConfig) throws SSLException {
+ SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
+ .sslProvider(clientConfig.getSslProvider())
+ .ciphers(Http2SecurityUtil.CIPHERS, clientConfig.getCipherSuiteFilter())
+ .applicationProtocolConfig(newApplicationProtocolConfig());
+ if (clientConfig.getSslContextProvider() != null) {
+ sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
+ }
+ if (clientConfig.getTrustManagerFactory() != null) {
+ sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
+ }
+ return sslContextBuilder.build();
+ }
+
+ private static ApplicationProtocolConfig newApplicationProtocolConfig() {
+ return new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN,
+ ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+ ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+ ApplicationProtocolNames.HTTP_2);
+ }
+
public interface TransportListener {
void onOpen(Transport transport);
@@ -269,4 +448,35 @@ public final class Client {
return thread;
}
}
+
+ class ClientChannelPoolHandler implements ChannelPoolHandler {
+
+ @Override
+ public void channelReleased(Channel channel) {
+ }
+
+ @Override
+ public void channelAcquired(Channel channel) {
+ }
+
+ @Override
+ public void channelCreated(Channel channel) {
+ HttpAddress httpAddress = channel.attr(pool.getAttributeKey()).get();
+ HttpVersion httpVersion = httpAddress.getVersion();
+ SslHandler sslHandler = newSslHandler(clientConfig, byteBufAllocator, httpAddress);
+ if (httpVersion.majorVersion() == 1) {
+ HttpChannelInitializer initializer = new HttpChannelInitializer(clientConfig, httpAddress,
+ sslHandler, httpResponseHandler);
+ if (channel instanceof SocketChannel) {
+ initializer.initChannel((SocketChannel) channel);
+ }
+ } else {
+ Http2ChannelInitializer initializer = new Http2ChannelInitializer(clientConfig, httpAddress,
+ sslHandler, http2SettingsHandler, http2ResponseHandler);
+ if (channel instanceof SocketChannel) {
+ initializer.initChannel((SocketChannel) channel);
+ }
+ }
+ }
+ }
}
diff --git a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java
index 8727167..bec85c1 100644
--- a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java
+++ b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java
@@ -2,10 +2,14 @@ package org.xbib.netty.http.client;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
@@ -155,11 +159,6 @@ public class ClientBuilder {
return this;
}
- public ClientBuilder setTrustManagerFactory(TrustManagerFactory trustManagerFactory) {
- clientConfig.setTrustManagerFactory(trustManagerFactory);
- return this;
- }
-
public ClientBuilder setKeyCert(InputStream keyCertChainInputStream, InputStream keyInputStream) {
clientConfig.setKeyCert(keyCertChainInputStream, keyInputStream);
return this;
@@ -171,8 +170,13 @@ public class ClientBuilder {
return this;
}
- public ClientBuilder setServerNameIdentification(boolean serverNameIdentification) {
- clientConfig.setServerNameIdentification(serverNameIdentification);
+ public ClientBuilder setTrustManagerFactory(TrustManagerFactory trustManagerFactory) {
+ clientConfig.setTrustManagerFactory(trustManagerFactory);
+ return this;
+ }
+
+ public ClientBuilder trustInsecure() {
+ clientConfig.setTrustManagerFactory(InsecureTrustManagerFactory.INSTANCE);
return this;
}
@@ -186,6 +190,46 @@ public class ClientBuilder {
return this;
}
+ public ClientBuilder addPoolNode(HttpAddress httpAddress) {
+ clientConfig.addPoolNode(httpAddress);
+ return this;
+ }
+
+ public ClientBuilder setPoolNodeConnectionLimit(int nodeConnectionLimit) {
+ clientConfig.setPoolNodeConnectionLimit(nodeConnectionLimit);
+ return this;
+ }
+
+ public ClientBuilder setRetriesPerPoolNode(int retriesPerNode) {
+ clientConfig.setRetriesPerPoolNode(retriesPerNode);
+ return this;
+ }
+
+ public ClientBuilder setPoolVersion(HttpVersion poolVersion) {
+ clientConfig.setPoolVersion(poolVersion);
+ return this;
+ }
+
+ public ClientBuilder setPoolSecure(boolean poolSecure) {
+ clientConfig.setPoolSecure(poolSecure);
+ return this;
+ }
+
+ public ClientBuilder addServerNameForIdentification(String serverName) {
+ clientConfig.addServerNameForIdentification(serverName);
+ return this;
+ }
+
+ public ClientBuilder setHttp2Settings(Http2Settings http2Settings) {
+ clientConfig.setHttp2Settings(http2Settings);
+ return this;
+ }
+
+ public ClientBuilder setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
+ clientConfig.setWriteBufferWaterMark(writeBufferWaterMark);
+ return this;
+ }
+
public Client build() {
return new Client(clientConfig, byteBufAllocator, eventLoopGroup, socketChannelClass);
}
diff --git a/src/main/java/org/xbib/netty/http/client/ClientConfig.java b/src/main/java/org/xbib/netty/http/client/ClientConfig.java
index d45ae1f..e1677d3 100644
--- a/src/main/java/org/xbib/netty/http/client/ClientConfig.java
+++ b/src/main/java/org/xbib/netty/http/client/ClientConfig.java
@@ -1,15 +1,22 @@
package org.xbib.netty.http.client;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.Epoll;
+import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.logging.LogLevel;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import org.xbib.netty.http.client.retry.BackOff;
import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.Provider;
+import java.util.ArrayList;
import java.util.List;
public class ClientConfig {
@@ -22,7 +29,18 @@ public class ClientConfig {
boolean DEBUG = false;
/**
- * Default for thread count.
+ * Default debug log level.
+ */
+ LogLevel DEFAULT_DEBUG_LOG_LEVEL = LogLevel.DEBUG;
+
+ /**
+ * The default for selecting epoll. If available, select epoll.
+ */
+ boolean EPOLL = Epoll.isAvailable();
+
+ /**
+ * If set to 0, then Netty will decide about thread count.
+ * Default is Runtime.getRuntime().availableProcessors() * 2
*/
int THREAD_COUNT = 0;
@@ -110,12 +128,40 @@ public class ClientConfig {
*/
CipherSuiteFilter CIPHER_SUITE_FILTER = SupportedCipherSuiteFilter.INSTANCE;
- boolean USE_SERVER_NAME_IDENTIFICATION = true;
-
/**
* Default for SSL client authentication.
*/
ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE;
+
+ /**
+ * Default for pool retries per node.
+ */
+ Integer RETRIES_PER_NODE = 0;
+
+ /**
+ * Default pool HTTP version.
+ */
+ HttpVersion POOL_VERSION = HttpVersion.HTTP_1_1;
+
+ /**
+ * Default connection pool security.
+ */
+ Boolean POOL_SECURE = false;
+
+ /**
+ * Default HTTP/2 settings.
+ */
+ Http2Settings HTTP2_SETTINGS = Http2Settings.defaultSettings();
+
+ /**
+ * Default write buffer water mark.
+ */
+ WriteBufferWaterMark WRITE_BUFFER_WATER_MARK = WriteBufferWaterMark.DEFAULT;
+
+ /**
+ * Default for backoff.
+ */
+ BackOff BACK_OFF = BackOff.ZERO_BACKOFF;
}
private static TrustManagerFactory TRUST_MANAGER_FACTORY;
@@ -123,10 +169,6 @@ public class ClientConfig {
static {
try {
TRUST_MANAGER_FACTORY = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- //InsecureTrustManagerFactory.INSTANCE;
- //TRUST_MANAGER_FACTORY.init((KeyStore) null);
- // java.lang.IllegalStateException: TrustManagerFactoryImpl is not initialized
- //TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
} catch (Exception e) {
TRUST_MANAGER_FACTORY = null;
}
@@ -134,10 +176,10 @@ public class ClientConfig {
private boolean debug = Defaults.DEBUG;
- /**
- * If set to 0, then Netty will decide about thread count.
- * Default is Runtime.getRuntime().availableProcessors() * 2
- */
+ private LogLevel debugLogLevel = Defaults.DEFAULT_DEBUG_LOG_LEVEL;
+
+ private boolean epoll = Defaults.EPOLL;
+
private int threadCount = Defaults.THREAD_COUNT;
private boolean tcpNodelay = Defaults.TCP_NODELAY;
@@ -178,8 +220,6 @@ public class ClientConfig {
private KeyStore trustManagerKeyStore = null;
- private boolean serverNameIdentification = Defaults.USE_SERVER_NAME_IDENTIFICATION;
-
private ClientAuthMode clientAuthMode = Defaults.SSL_CLIENT_AUTH_MODE;
private InputStream keyCertChainInputStream;
@@ -190,11 +230,23 @@ public class ClientConfig {
private HttpProxyHandler httpProxyHandler;
- private List nodes;
+ private List poolNodes = new ArrayList<>();
- private Integer nodeConnectionLimit;
+ private Integer poolNodeConnectionLimit;
- private Integer retriesPerNode;
+ private Integer retriesPerPoolNode = Defaults.RETRIES_PER_NODE;
+
+ private HttpVersion poolVersion = Defaults.POOL_VERSION;
+
+ private Boolean poolSecure = Defaults.POOL_SECURE;
+
+ private List serverNamesForIdentification = new ArrayList<>();
+
+ private Http2Settings http2Settings = Defaults.HTTP2_SETTINGS;
+
+ private WriteBufferWaterMark writeBufferWaterMark = Defaults.WRITE_BUFFER_WATER_MARK;
+
+ private BackOff backOff = Defaults.BACK_OFF;
public ClientConfig setDebug(boolean debug) {
this.debug = debug;
@@ -215,6 +267,29 @@ public class ClientConfig {
return debug;
}
+ public ClientConfig setDebugLogLevel(LogLevel debugLogLevel) {
+ this.debugLogLevel = debugLogLevel;
+ return this;
+ }
+
+ public LogLevel getDebugLogLevel() {
+ return debugLogLevel;
+ }
+
+ public ClientConfig enableEpoll() {
+ this.epoll = true;
+ return this;
+ }
+
+ public ClientConfig disableEpoll() {
+ this.epoll = false;
+ return this;
+ }
+
+ public boolean isEpoll() {
+ return epoll;
+ }
+
public ClientConfig setThreadCount(int threadCount) {
this.threadCount = threadCount;
return this;
@@ -341,6 +416,15 @@ public class ClientConfig {
return enableGzip;
}
+ public ClientConfig setHttp2Settings(Http2Settings http2Settings) {
+ this.http2Settings = http2Settings;
+ return this;
+ }
+
+ public Http2Settings getHttp2Settings() {
+ return http2Settings;
+ }
+
public ClientConfig setSslProvider(SslProvider sslProvider) {
this.sslProvider = sslProvider;
return this;
@@ -413,15 +497,6 @@ public class ClientConfig {
return keyPassword;
}
- public ClientConfig setServerNameIdentification(boolean serverNameIdentification) {
- this.serverNameIdentification = serverNameIdentification;
- return this;
- }
-
- public boolean isServerNameIdentification() {
- return serverNameIdentification;
- }
-
public ClientConfig setClientAuthMode(ClientAuthMode clientAuthMode) {
this.clientAuthMode = clientAuthMode;
return this;
@@ -458,31 +533,81 @@ public class ClientConfig {
return httpProxyHandler;
}
- public ClientConfig setNodes(List nodes) {
- this.nodes = nodes;
+ public ClientConfig setPoolNodes(List poolNodes) {
+ this.poolNodes = poolNodes;
return this;
}
- public List getNodes() {
- return nodes;
+ public List getPoolNodes() {
+ return poolNodes;
}
- public ClientConfig setNodeConnectionLimit(Integer nodeConnectionLimit) {
- this.nodeConnectionLimit = nodeConnectionLimit;
+ public ClientConfig addPoolNode(HttpAddress poolNodeAddress) {
+ this.poolNodes.add(poolNodeAddress);
return this;
}
- public Integer getNodeConnectionLimit() {
- return nodeConnectionLimit;
- }
-
- public ClientConfig setRetriesPerNode(Integer retriesPerNode) {
- this.retriesPerNode = retriesPerNode;
+ public ClientConfig setPoolNodeConnectionLimit(Integer poolNodeConnectionLimit) {
+ this.poolNodeConnectionLimit = poolNodeConnectionLimit;
return this;
}
- public Integer getRetriesPerNode() {
- return retriesPerNode;
+ public Integer getPoolNodeConnectionLimit() {
+ return poolNodeConnectionLimit;
+ }
+
+ public ClientConfig setRetriesPerPoolNode(Integer retriesPerPoolNode) {
+ this.retriesPerPoolNode = retriesPerPoolNode;
+ return this;
+ }
+
+ public Integer getRetriesPerPoolNode() {
+ return retriesPerPoolNode;
+ }
+
+ public ClientConfig setPoolVersion(HttpVersion poolVersion) {
+ this.poolVersion = poolVersion;
+ return this;
+ }
+
+ public HttpVersion getPoolVersion() {
+ return poolVersion;
+ }
+
+ public ClientConfig setPoolSecure(boolean poolSecure) {
+ this.poolSecure = poolSecure;
+ return this;
+ }
+
+ public boolean isPoolSecure() {
+ return poolSecure;
+ }
+
+ public ClientConfig addServerNameForIdentification(String serverNameForIdentification) {
+ this.serverNamesForIdentification.add(serverNameForIdentification);
+ return this;
+ }
+
+ public List getServerNamesForIdentification() {
+ return serverNamesForIdentification;
+ }
+
+ public ClientConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
+ this.writeBufferWaterMark = writeBufferWaterMark;
+ return this;
+ }
+
+ public WriteBufferWaterMark getWriteBufferWaterMark() {
+ return writeBufferWaterMark;
+ }
+
+ public ClientConfig setBackOff(BackOff backOff) {
+ this.backOff = backOff;
+ return this;
+ }
+
+ public BackOff getBackOff() {
+ return backOff;
}
@Override
@@ -491,7 +616,5 @@ public class ClientConfig {
sb.append("SSL=").append(sslProvider)
.append(",SSL context provider=").append(sslContextProvider != null ? sslContextProvider.getName() : "");
return sb.toString();
-
-
}
}
diff --git a/src/main/java/org/xbib/netty/http/client/Request.java b/src/main/java/org/xbib/netty/http/client/Request.java
index cfe76e2..d593115 100644
--- a/src/main/java/org/xbib/netty/http/client/Request.java
+++ b/src/main/java/org/xbib/netty/http/client/Request.java
@@ -10,14 +10,18 @@ import org.xbib.net.URL;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
+import org.xbib.netty.http.client.retry.BackOff;
+import java.io.Closeable;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
/**
- *
+ * HTTP client request.
*/
-public class Request {
+public class Request implements Closeable {
private final URL base;
@@ -33,7 +37,7 @@ public class Request {
private final ByteBuf content;
- private final int timeout;
+ private final long timeoutInMillis;
private final boolean followRedirect;
@@ -41,6 +45,12 @@ public class Request {
private int redirectCount;
+ private final boolean isBackOff;
+
+ private final BackOff backOff;
+
+ private CompletableFuture> completableFuture;
+
private HttpResponseListener responseListener;
private HttpHeadersListener headersListener;
@@ -50,7 +60,8 @@ public class Request {
Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod,
HttpHeaders headers, Collection cookies,
String uri, ByteBuf content,
- int timeout, boolean followRedirect, int maxRedirect, int redirectCount) {
+ long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount,
+ boolean isBackOff, BackOff backOff) {
this.base = url;
this.httpVersion = httpVersion;
this.httpMethod = httpMethod;
@@ -58,10 +69,12 @@ public class Request {
this.cookies = cookies;
this.uri = uri;
this.content = content;
- this.timeout = timeout;
+ this.timeoutInMillis = timeoutInMillis;
this.followRedirect = followRedirect;
this.maxRedirects = maxRedirect;
this.redirectCount = redirectCount;
+ this.isBackOff = isBackOff;
+ this.backOff = backOff;
}
public URL base() {
@@ -92,15 +105,27 @@ public class Request {
return content;
}
- public int getTimeout() {
- return timeout;
+ /**
+ * Return the timeout in milliseconds per request. This overrides the read timeout of the client.
+ * @return timeout timeout in milliseconds
+ */
+ public long getTimeoutInMillis() {
+ return timeoutInMillis;
}
public boolean isFollowRedirect() {
return followRedirect;
}
- public boolean checkRedirect() {
+ public boolean isBackOff() {
+ return isBackOff;
+ }
+
+ public BackOff getBackOff() {
+ return backOff;
+ }
+
+ public boolean canRedirect() {
if (!followRedirect) {
return false;
}
@@ -124,6 +149,15 @@ public class Request {
return sb.toString();
}
+ public Request setCompletableFuture(CompletableFuture> completableFuture) {
+ this.completableFuture = completableFuture;
+ return this;
+ }
+
+ public CompletableFuture> getCompletableFuture() {
+ return completableFuture;
+ }
+
public Request setHeadersListener(HttpHeadersListener httpHeadersListener) {
this.headersListener = httpHeadersListener;
return this;
@@ -190,4 +224,11 @@ public class Request {
public static RequestBuilder builder(HttpMethod httpMethod) {
return new RequestBuilder().setMethod(httpMethod);
}
+
+ @Override
+ public void close() throws IOException {
+ if (content != null) {
+ content.release();
+ }
+ }
}
diff --git a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java
index a895b8e..b8dfb5f 100644
--- a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java
+++ b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java
@@ -16,6 +16,7 @@ import io.netty.util.AsciiString;
import org.xbib.net.QueryParameters;
import org.xbib.net.URL;
import org.xbib.net.URLSyntaxException;
+import org.xbib.netty.http.client.retry.BackOff;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -44,7 +45,7 @@ public class RequestBuilder {
private static final boolean DEFAULT_FOLLOW_REDIRECT = true;
- private static final int DEFAULT_TIMEOUT_MILLIS = 5000;
+ private static final long DEFAULT_TIMEOUT_MILLIS = -1L;
private static final int DEFAULT_MAX_REDIRECT = 10;
@@ -74,12 +75,16 @@ public class RequestBuilder {
private ByteBuf content;
- private int timeout;
+ private long timeoutInMillis;
private boolean followRedirect;
private int maxRedirects;
+ private boolean enableBackOff;
+
+ private BackOff backOff;
+
RequestBuilder() {
httpMethod = DEFAULT_METHOD;
httpVersion = DEFAULT_HTTP_VERSION;
@@ -87,7 +92,7 @@ public class RequestBuilder {
gzip = DEFAULT_GZIP;
keepalive = DEFAULT_KEEPALIVE;
url = DEFAULT_URL;
- timeout = DEFAULT_TIMEOUT_MILLIS;
+ timeoutInMillis = DEFAULT_TIMEOUT_MILLIS;
followRedirect = DEFAULT_FOLLOW_REDIRECT;
maxRedirects = DEFAULT_MAX_REDIRECT;
headers = new DefaultHttpHeaders();
@@ -121,8 +126,8 @@ public class RequestBuilder {
return this;
}
- public RequestBuilder setTimeout(int timeout) {
- this.timeout = timeout;
+ public RequestBuilder setTimeoutInMillis(long timeoutInMillis) {
+ this.timeoutInMillis = timeoutInMillis;
return this;
}
@@ -207,6 +212,16 @@ public class RequestBuilder {
return this;
}
+ public RequestBuilder enableBackOff(boolean enableBackOff) {
+ this.enableBackOff = enableBackOff;
+ return this;
+ }
+
+ public RequestBuilder setBackOff(BackOff backOff) {
+ this.backOff = backOff;
+ return this;
+ }
+
public RequestBuilder setUserAgent(String userAgent) {
this.userAgent = userAgent;
return this;
@@ -255,14 +270,10 @@ public class RequestBuilder {
throw new IllegalStateException("host in URL not defined: " + url);
}
if (uri != null) {
- if (this.url != null) {
- try {
- url = URL.base(url).resolve(uri);
- } catch (URLSyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- } else {
- url(uri);
+ try {
+ url = URL.base(url).resolve(uri);
+ } catch (URLSyntaxException e) {
+ throw new IllegalArgumentException(e);
}
}
// add explicit parameters to URL
@@ -320,7 +331,7 @@ public class RequestBuilder {
validatedHeaders.remove(headerName);
}
return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, uri, content,
- timeout, followRedirect, maxRedirects, 0);
+ timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff);
}
private void addHeader(AsciiString name, Object value) {
diff --git a/src/main/java/org/xbib/netty/http/client/handler/UserEventLogger.java b/src/main/java/org/xbib/netty/http/client/handler/UserEventLogger.java
deleted file mode 100644
index fcb52db..0000000
--- a/src/main/java/org/xbib/netty/http/client/handler/UserEventLogger.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.xbib.netty.http.client.handler;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.socket.ChannelInputShutdownReadComplete;
-import io.netty.handler.ssl.SslCloseCompletionEvent;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A Netty handler that logs user events and find expetced ones.
- */
-@ChannelHandler.Sharable
-class UserEventLogger extends ChannelInboundHandlerAdapter {
-
- private static final Logger logger = Logger.getLogger(UserEventLogger.class.getName());
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- logger.log(Level.FINE, () -> "got user event " + evt);
- if (evt instanceof SslCloseCompletionEvent ||
- evt instanceof ChannelInputShutdownReadComplete) {
- logger.log(Level.FINE, () -> "user event is expected: " + evt);
- return;
- }
- super.userEventTriggered(ctx, evt);
- }
-}
diff --git a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java
index 850d17d..399083d 100644
--- a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java
+++ b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java
@@ -6,87 +6,59 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslHandler;
import org.xbib.netty.http.client.ClientConfig;
import org.xbib.netty.http.client.HttpAddress;
-import org.xbib.netty.http.client.handler.TrafficLoggingHandler;
-import javax.net.ssl.SNIHostName;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLParameters;
-import java.util.Collections;
+import java.util.logging.Level;
+import java.util.logging.Logger;
public class HttpChannelInitializer extends ChannelInitializer {
+ private static final Logger logger = Logger.getLogger(HttpChannelInitializer.class.getName());
+
private final ClientConfig clientConfig;
private final HttpAddress httpAddress;
+ private final SslHandler sslHandler;
+
private final HttpResponseHandler httpResponseHandler;
- public HttpChannelInitializer(ClientConfig clientConfig, HttpAddress httpAddress, HttpResponseHandler httpResponseHandler) {
+ public HttpChannelInitializer(ClientConfig clientConfig,
+ HttpAddress httpAddress,
+ SslHandler sslHandler,
+ HttpResponseHandler httpResponseHandler) {
this.clientConfig = clientConfig;
this.httpAddress = httpAddress;
+ this.sslHandler = sslHandler;
this.httpResponseHandler = httpResponseHandler;
}
@Override
- protected void initChannel(SocketChannel ch) {
+ public void initChannel(SocketChannel channel) {
if (clientConfig.isDebug()) {
- ch.pipeline().addLast(new TrafficLoggingHandler());
+ channel.pipeline().addLast(new TrafficLoggingHandler(LogLevel.DEBUG));
}
if (httpAddress.isSecure()) {
- configureEncryptedHttp1(ch);
+ configureEncrypted(channel);
} else {
- configureCleartextHttp1(ch);
+ configureCleartext(channel);
+ }
+ if (clientConfig.isDebug()) {
+ logger.log(Level.FINE, "HTTP 1 channel initialized: " + channel.pipeline().names());
}
}
- private void configureEncryptedHttp1(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
- try {
- SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
- .sslProvider(clientConfig.getSslProvider())
- .keyManager(clientConfig.getKeyCertChainInputStream(), clientConfig.getKeyInputStream(),
- clientConfig.getKeyPassword())
- .ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter());
- if (clientConfig.getSslContextProvider() != null) {
- sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
- }
- if (clientConfig.getTrustManagerFactory() != null) {
- sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
- }
- SslContext sslContext = sslContextBuilder.build();
- SslHandler sslHandler = sslContext.newHandler(ch.alloc());
- SSLEngine engine = sslHandler.engine();
- if (clientConfig.isServerNameIdentification()) {
- String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName();
- SSLParameters params = engine.getSSLParameters();
- params.setServerNames(Collections.singletonList(new SNIHostName(fullQualifiedHostname)));
- engine.setSSLParameters(params);
- }
- pipeline.addLast(sslHandler);
- switch (clientConfig.getClientAuthMode()) {
- case NEED:
- engine.setNeedClientAuth(true);
- break;
- case WANT:
- engine.setWantClientAuth(true);
- break;
- default:
- break;
- }
- } catch (SSLException e) {
- throw new IllegalStateException("unable to configure SSL: " + e.getMessage(), e);
- }
- configureCleartextHttp1(ch);
+ private void configureEncrypted(SocketChannel channel) {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast(sslHandler);
+ configureCleartext(channel);
}
- private void configureCleartextHttp1(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
+ private void configureCleartext(SocketChannel channel) {
+ ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpClientCodec(clientConfig.getMaxInitialLineLength(),
clientConfig.getMaxHeadersSize(), clientConfig.getMaxChunkSize()));
if (clientConfig.isEnableGzip()) {
diff --git a/src/main/java/org/xbib/netty/http/client/handler/TrafficLoggingHandler.java b/src/main/java/org/xbib/netty/http/client/handler/http1/TrafficLoggingHandler.java
similarity index 88%
rename from src/main/java/org/xbib/netty/http/client/handler/TrafficLoggingHandler.java
rename to src/main/java/org/xbib/netty/http/client/handler/http1/TrafficLoggingHandler.java
index bd62909..333a2f0 100644
--- a/src/main/java/org/xbib/netty/http/client/handler/TrafficLoggingHandler.java
+++ b/src/main/java/org/xbib/netty/http/client/handler/http1/TrafficLoggingHandler.java
@@ -1,4 +1,4 @@
-package org.xbib.netty.http.client.handler;
+package org.xbib.netty.http.client.handler.http1;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
@@ -13,8 +13,8 @@ import io.netty.handler.logging.LoggingHandler;
@ChannelHandler.Sharable
public class TrafficLoggingHandler extends LoggingHandler {
- public TrafficLoggingHandler() {
- super("client", LogLevel.TRACE);
+ public TrafficLoggingHandler(LogLevel level) {
+ super("client", level);
}
@Override
diff --git a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java
index c4493aa..c3c340b 100644
--- a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java
+++ b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java
@@ -7,25 +7,14 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2FrameLogger;
-import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import org.xbib.netty.http.client.ClientConfig;
import org.xbib.netty.http.client.HttpAddress;
-import javax.net.ssl.SNIHostName;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLParameters;
-import java.util.Collections;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -37,30 +26,69 @@ public class Http2ChannelInitializer extends ChannelInitializer {
private final HttpAddress httpAddress;
+ private final SslHandler sslHandler;
+
private final Http2SettingsHandler http2SettingsHandler;
private final Http2ResponseHandler http2ResponseHandler;
public Http2ChannelInitializer(ClientConfig clientConfig,
HttpAddress httpAddress,
+ SslHandler sslHandler,
Http2SettingsHandler http2SettingsHandler,
Http2ResponseHandler http2ResponseHandler) {
this.clientConfig = clientConfig;
this.httpAddress = httpAddress;
+ this.sslHandler = sslHandler;
this.http2SettingsHandler = http2SettingsHandler;
this.http2ResponseHandler = http2ResponseHandler;
}
/**
- * The channel initialization for HTTP/2 is always encrypted.
- * The reason is there is no known HTTP/2 server supporting cleartext.
+ * The channel initialization for HTTP/2.
*
- * @param ch socket channel
+ * @param channel socket channel
*/
@Override
- protected void initChannel(SocketChannel ch) {
+ public void initChannel(SocketChannel channel) {
+ if (httpAddress.isSecure()) {
+ configureEncrypted(channel);
+ } else {
+ configureCleartext(channel);
+ }
+ if (clientConfig.isDebug()) {
+ logger.log(Level.FINE, "HTTP/2 channel initialized: " + channel.pipeline().names());
+ }
+ }
+
+ private void configureEncrypted(SocketChannel channel) {
+ channel.pipeline().addLast(sslHandler);
+ ApplicationProtocolNegotiationHandler negotiationHandler = new ApplicationProtocolNegotiationHandler("") {
+ @Override
+ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
+ if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
+ ctx.pipeline().addLast(newConnectionHandler(), http2SettingsHandler, http2ResponseHandler);
+ if (clientConfig.isDebug()) {
+ logger.log(Level.FINE, "after negotiation: " + ctx.pipeline().names());
+ }
+ return;
+ }
+ // we do not fall back to HTTP1
+ ctx.close();
+ throw new IllegalStateException("protocol not accepted: " + protocol);
+ }
+ };
+ channel.pipeline().addLast(negotiationHandler);
+}
+
+ private void configureCleartext(SocketChannel ch) {
+ ch.pipeline().addLast(newConnectionHandler(), http2SettingsHandler, http2ResponseHandler);
+ }
+
+ private Http2ConnectionHandler newConnectionHandler() {
Http2Connection http2Connection = new DefaultHttp2Connection(false);
HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder()
+ .initialSettings(clientConfig.getHttp2Settings())
.connection(http2Connection)
.frameListener(new Http2PushPromiseHandler(http2Connection,
new InboundHttp2ToHttpAdapterBuilder(http2Connection)
@@ -68,48 +96,8 @@ public class Http2ChannelInitializer extends ChannelInitializer {
.propagateSettings(true)
.build()));
if (clientConfig.isDebug()) {
- http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client"));
- }
- Http2ConnectionHandler http2ConnectionHandler = http2ConnectionHandlerBuilder.build();
- try {
- SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
- .sslProvider(clientConfig.getSslProvider())
- .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
- .applicationProtocolConfig(new ApplicationProtocolConfig(
- ApplicationProtocolConfig.Protocol.ALPN,
- ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
- ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
- ApplicationProtocolNames.HTTP_2));
- if (clientConfig.getSslContextProvider() != null) {
- sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
- }
- if (clientConfig.getTrustManagerFactory() != null) {
- sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
- }
- SslContext sslContext = sslContextBuilder.build();
- SslHandler sslHandler = sslContext.newHandler(ch.alloc());
- SSLEngine engine = sslHandler.engine();
- if (clientConfig.isServerNameIdentification()) {
- String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName();
- SSLParameters params = engine.getSSLParameters();
- params.setServerNames(Collections.singletonList(new SNIHostName(fullQualifiedHostname)));
- engine.setSSLParameters(params);
- }
- ch.pipeline().addLast(sslHandler);
- ApplicationProtocolNegotiationHandler negotiationHandler = new ApplicationProtocolNegotiationHandler("") {
- @Override
- protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
- if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
- ctx.pipeline().addLast(http2ConnectionHandler, http2SettingsHandler, http2ResponseHandler);
- return;
- }
- ctx.close();
- throw new IllegalStateException("unknown protocol: " + protocol);
- }
- };
- ch.pipeline().addLast(negotiationHandler);
- } catch (SSLException e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
+ http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(clientConfig.getDebugLogLevel(), "client"));
}
+ return http2ConnectionHandlerBuilder.build();
}
}
diff --git a/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java b/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java
similarity index 60%
rename from src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java
rename to src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java
index 26cb3ed..87134bb 100644
--- a/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java
+++ b/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java
@@ -6,14 +6,18 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
@@ -22,12 +26,16 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class SimpleChannelPool implements Pool {
+public class BoundedChannelPool implements Pool {
- private static final Logger logger = Logger.getLogger(SimpleChannelPool.class.getName());
+ private static final Logger logger = Logger.getLogger(BoundedChannelPool.class.getName());
private final Semaphore semaphore;
+ private final HttpVersion httpVersion;
+
+ private final boolean isSecure;
+
private final ChannelPoolHandler channelPoolhandler;
private final List nodes;
@@ -52,6 +60,8 @@ public class SimpleChannelPool implements Pool {
/**
* @param semaphore the concurrency level
+ * @param httpVersion the HTTP version of the pool connections
+ * @param isSecure if this pool has secure connections
* @param nodes the endpoint nodes, any element may contain the port (followed after ":")
* to override the defaultPort argument
* @param bootstrap bootstrap instance
@@ -59,16 +69,19 @@ public class SimpleChannelPool implements Pool {
* @param retriesPerNode the max count of the subsequent connection failures to the node before
* the node will be excluded from the pool. If set to 0, the value is ignored.
*/
- public SimpleChannelPool(Semaphore semaphore, List nodes, Bootstrap bootstrap,
- ChannelPoolHandler channelPoolHandler, int retriesPerNode) {
+ public BoundedChannelPool(Semaphore semaphore, HttpVersion httpVersion, boolean isSecure,
+ List nodes, Bootstrap bootstrap,
+ ChannelPoolHandler channelPoolHandler, int retriesPerNode) {
this.semaphore = semaphore;
+ this.httpVersion = httpVersion;
+ this.isSecure = isSecure;
this.channelPoolhandler = channelPoolHandler;
this.nodes = nodes;
this.retriesPerNode = retriesPerNode;
this.lock = new ReentrantLock();
this.attributeKey = AttributeKey.valueOf("poolKey");
if (nodes == null || nodes.isEmpty()) {
- throw new IllegalArgumentException("empty nodes array argument");
+ throw new IllegalArgumentException("nodes must not be empty");
}
this.numberOfNodes = nodes.size();
bootstraps = new HashMap<>(numberOfNodes);
@@ -77,46 +90,48 @@ public class SimpleChannelPool implements Pool {
counts = new HashMap<>(numberOfNodes);
failedCounts = new HashMap<>(numberOfNodes);
for (K node : nodes) {
+ ChannelPoolInitializer initializer = new ChannelPoolInitializer(node, channelPoolHandler);
bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress())
- .handler(new ChannelInitializer() {
- @Override
- protected void initChannel(Channel channel) throws Exception {
- if(!channel.eventLoop().inEventLoop()) {
- throw new IllegalStateException();
- }
- if (channelPoolHandler != null) {
- channelPoolHandler.channelCreated(channel);
- }
- }
- }));
+ .handler(initializer));
availableChannels.put(node, new ConcurrentLinkedQueue<>());
counts.put(node, 0);
failedCounts.put(node, 0);
}
}
+ public HttpVersion getVersion() {
+ return httpVersion;
+ }
+
+ public boolean isSecure() {
+ return isSecure;
+ }
+
+ public AttributeKey getAttributeKey() {
+ return attributeKey;
+ }
+
@Override
- public void prepare(int count) throws ConnectException {
- if (count > 0) {
- for (int i = 0; i < count; i ++) {
- Channel channel = connectToAnyNode();
- if (channel == null) {
- throw new ConnectException("failed to prepare the connections");
- }
- K nodeAddr = channel.attr(attributeKey).get();
- if (channel.isActive()) {
- Queue channelQueue = availableChannels.get(nodeAddr);
- if (channelQueue != null) {
- channelQueue.add(channel);
- }
- } else {
- channel.close();
- }
- }
- logger.log(Level.FINE,"prepared " + count + " connections");
- } else {
- throw new IllegalArgumentException("Connection count should be > 0, but got " + count);
+ public void prepare(int channelCount) throws ConnectException {
+ if (channelCount <= 0) {
+ throw new IllegalArgumentException("channel count must be greater zero, but got " + channelCount);
}
+ for (int i = 0; i < channelCount; i++) {
+ Channel channel = newConnection();
+ if (channel == null) {
+ throw new ConnectException("failed to prepare");
+ }
+ K key = channel.attr(attributeKey).get();
+ if (channel.isActive()) {
+ Queue channelQueue = availableChannels.get(key);
+ if (channelQueue != null) {
+ channelQueue.add(channel);
+ }
+ } else {
+ channel.close();
+ }
+ }
+ logger.log(Level.FINE,"prepared " + channelCount + " channels");
}
@Override
@@ -124,7 +139,7 @@ public class SimpleChannelPool implements Pool {
Channel channel = null;
if (semaphore.tryAcquire()) {
if ((channel = poll()) == null) {
- channel = connectToAnyNode();
+ channel = newConnection();
}
if (channel == null) {
semaphore.release();
@@ -150,7 +165,7 @@ public class SimpleChannelPool implements Pool {
Channel channel;
for (int i = 0; i < availableCount; i ++) {
if (null == (channel = poll())) {
- channel = connectToAnyNode();
+ channel = newConnection();
}
if (channel == null) {
semaphore.release(availableCount - i);
@@ -167,18 +182,23 @@ public class SimpleChannelPool implements Pool {
@Override
public void release(Channel channel) throws Exception {
- K nodeAddr = channel.attr(attributeKey).get();
- if (channel.isActive()) {
- Queue channelQueue = availableChannels.get(nodeAddr);
- if (channelQueue != null) {
- channelQueue.add(channel);
+ try {
+ if (channel != null) {
+ if (channel.isActive()) {
+ K key = channel.attr(attributeKey).get();
+ Queue channelQueue = availableChannels.get(key);
+ if (channelQueue != null) {
+ channelQueue.add(channel);
+ }
+ } else if (channel.isOpen()) {
+ channel.close();
+ }
+ if (channelPoolhandler != null) {
+ channelPoolhandler.channelReleased(channel);
+ }
}
+ } finally {
semaphore.release();
- } else {
- channel.close();
- }
- if (channelPoolhandler != null) {
- channelPoolhandler.channelReleased(channel);
}
}
@@ -193,65 +213,63 @@ public class SimpleChannelPool implements Pool {
public void close() {
lock.lock();
try {
- int closedConnCount = 0;
- for (K nodeAddr : availableChannels.keySet()) {
- for (Channel conn : availableChannels.get(nodeAddr)) {
- if (conn.isOpen()) {
- conn.close();
- closedConnCount++;
- }
+ int count = 0;
+ Set channelSet = new HashSet<>();
+ for (Map.Entry> entry : availableChannels.entrySet()) {
+ channelSet.addAll(entry.getValue());
+ }
+ for (Map.Entry> entry : channels.entrySet()) {
+ channelSet.addAll(entry.getValue());
+ }
+ for (Channel channel : channelSet) {
+ if (channel != null && channel.isOpen()) {
+ logger.log(Level.FINE, "closing channel " + channel);
+ channel.close();
+ count++;
}
}
availableChannels.clear();
- for (K nodeAddr : channels.keySet()) {
- for (Channel channel : channels.get(nodeAddr)) {
- if (channel != null && channel.isOpen()) {
- channel.close();
- closedConnCount++;
- }
- }
- }
channels.clear();
bootstraps.clear();
counts.clear();
- logger.log(Level.FINE, "closed " + closedConnCount + " connections");
+ logger.log(Level.FINE, "closed " + count + " connections");
} finally {
lock.unlock();
}
}
- private Channel connectToAnyNode() throws ConnectException {
+ private Channel newConnection() throws ConnectException {
Channel channel = null;
- K nodeAddr = null;
- K nextNodeAddr;
+ K key = null;
+ K nextKey;
int min = Integer.MAX_VALUE;
int next;
int i = ThreadLocalRandom.current().nextInt(numberOfNodes);
for (int j = i; j < numberOfNodes; j ++) {
- nextNodeAddr = nodes.get(j % numberOfNodes);
- next = counts.get(nextNodeAddr);
- if(next == 0) {
- nodeAddr = nextNodeAddr;
+ nextKey = nodes.get(j % numberOfNodes);
+ next = counts.get(nextKey);
+ if (next == 0) {
+ key = nextKey;
break;
} else if (next < min) {
min = next;
- nodeAddr = nextNodeAddr;
+ key = nextKey;
}
}
- if (nodeAddr != null) {
- logger.log(Level.FINE, "trying connection to " + nodeAddr);
+ if (key != null) {
+ logger.log(Level.FINE, "trying connection to " + key);
try {
- channel = connect(nodeAddr);
+ channel = connect(key);
} catch (Exception e) {
- logger.log(Level.WARNING, "failed to create a new connection to " + nodeAddr + ": " + e.toString());
+ logger.log(Level.WARNING, "failed to create a new connection to " + key + ": " + e.toString());
if (retriesPerNode > 0) {
- int selectedNodeFailedConnAttemptsCount = failedCounts.get(nodeAddr) + 1;
- failedCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount);
+ int selectedNodeFailedConnAttemptsCount = failedCounts.get(key) + 1;
+ failedCounts.put(key, selectedNodeFailedConnAttemptsCount);
if (selectedNodeFailedConnAttemptsCount > retriesPerNode) {
- logger.log(Level.WARNING, "failed to connect to the node " + nodeAddr + " "
+ logger.log(Level.WARNING, "failed to connect to the node " + key + " "
+ selectedNodeFailedConnAttemptsCount + " times, "
+ "excluding the node from the connection pool");
- counts.put(nodeAddr, Integer.MAX_VALUE);
+ counts.put(key, Integer.MAX_VALUE);
boolean allNodesExcluded = true;
for (K node : nodes) {
if (counts.get(node) < Integer.MAX_VALUE) {
@@ -272,22 +290,22 @@ public class SimpleChannelPool implements Pool {
}
}
if (channel != null) {
- channel.closeFuture().addListener(new CloseChannelListener(nodeAddr, channel));
- channel.attr(attributeKey).set(nodeAddr);
- channels.computeIfAbsent(nodeAddr, node -> new ArrayList<>()).add(channel);
+ channel.closeFuture().addListener(new CloseChannelListener(key, channel));
+ channel.attr(attributeKey).set(key);
+ channels.computeIfAbsent(key, node -> new ArrayList<>()).add(channel);
synchronized (counts) {
- counts.put(nodeAddr, counts.get(nodeAddr) + 1);
+ counts.put(key, counts.get(key) + 1);
}
- if(retriesPerNode > 0) {
- failedCounts.put(nodeAddr, 0);
+ if (retriesPerNode > 0) {
+ failedCounts.put(key, 0);
}
- logger.log(Level.FINE,"new connection to " + nodeAddr + " created");
+ logger.log(Level.FINE,"new connection to " + key + " created");
}
return channel;
}
- private Channel connect(K addr) throws Exception {
- Bootstrap bootstrap = bootstraps.get(addr);
+ private Channel connect(K key) throws Exception {
+ Bootstrap bootstrap = bootstraps.get(key);
if (bootstrap != null) {
return bootstrap.connect().sync().channel();
}
@@ -312,26 +330,26 @@ public class SimpleChannelPool implements Pool {
private class CloseChannelListener implements ChannelFutureListener {
- private final K nodeAddr;
+ private final K key;
private final Channel channel;
- private CloseChannelListener(K nodeAddr, Channel channel) {
- this.nodeAddr = nodeAddr;
+ private CloseChannelListener(K key, Channel channel) {
+ this.key = key;
this.channel = channel;
}
@Override
public void operationComplete(ChannelFuture future) {
- logger.log(Level.FINE,"connection to " + nodeAddr + " closed");
+ logger.log(Level.FINE,"connection to " + key + " closed");
lock.lock();
try {
synchronized (counts) {
- if (counts.containsKey(nodeAddr)) {
- counts.put(nodeAddr, counts.get(nodeAddr) - 1);
+ if (counts.containsKey(key)) {
+ counts.put(key, counts.get(key) - 1);
}
}
synchronized (channels) {
- List channels = SimpleChannelPool.this.channels.get(nodeAddr);
+ List channels = BoundedChannelPool.this.channels.get(key);
if (channels != null) {
channels.remove(channel);
}
@@ -342,4 +360,27 @@ public class SimpleChannelPool implements Pool {
}
}
}
+
+ class ChannelPoolInitializer extends ChannelInitializer {
+
+ private final K key;
+
+ private final ChannelPoolHandler channelPoolHandler;
+
+ ChannelPoolInitializer(K key, ChannelPoolHandler channelPoolHandler) {
+ this.key = key;
+ this.channelPoolHandler = channelPoolHandler;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ if (!channel.eventLoop().inEventLoop()) {
+ throw new IllegalStateException();
+ }
+ channel.attr(attributeKey).set(key);
+ if (channelPoolHandler != null) {
+ channelPoolHandler.channelCreated(channel);
+ }
+ }
+ }
}
diff --git a/src/main/java/org/xbib/netty/http/client/rest/RestClient.java b/src/main/java/org/xbib/netty/http/client/rest/RestClient.java
index a25870f..6ba3742 100644
--- a/src/main/java/org/xbib/netty/http/client/rest/RestClient.java
+++ b/src/main/java/org/xbib/netty/http/client/rest/RestClient.java
@@ -36,7 +36,13 @@ public class RestClient {
public String asString() {
ByteBuf byteBuf = response != null ? response.content() : null;
- return byteBuf != null && byteBuf.isReadable() ? response.content().toString(StandardCharsets.UTF_8) : null;
+ try {
+ return byteBuf != null && byteBuf.isReadable() ? response.content().toString(StandardCharsets.UTF_8) : null;
+ } finally {
+ if (byteBuf != null) {
+ byteBuf.release();
+ }
+ }
}
public static RestClient get(String urlString) throws IOException {
diff --git a/src/main/java/org/xbib/netty/http/client/retry/BackOff.java b/src/main/java/org/xbib/netty/http/client/retry/BackOff.java
index 4025363..b346438 100644
--- a/src/main/java/org/xbib/netty/http/client/retry/BackOff.java
+++ b/src/main/java/org/xbib/netty/http/client/retry/BackOff.java
@@ -14,7 +14,7 @@ public interface BackOff {
/**
* Reset to initial state.
*/
- void reset() throws IOException;
+ void reset();
/**
* Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to
@@ -33,7 +33,7 @@ public interface BackOff {
}
*
*/
- long nextBackOffMillis() throws IOException;
+ long nextBackOffMillis();
/**
* Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried
diff --git a/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java b/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java
index dd0638a..8f20b8c 100644
--- a/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java
+++ b/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java
@@ -137,22 +137,19 @@ public class ExponentialBackOff implements BackOff {
/**
* @param builder builder
*/
- protected ExponentialBackOff(Builder builder) {
+ private ExponentialBackOff(Builder builder) {
initialIntervalMillis = builder.initialIntervalMillis;
randomizationFactor = builder.randomizationFactor;
multiplier = builder.multiplier;
maxIntervalMillis = builder.maxIntervalMillis;
maxElapsedTimeMillis = builder.maxElapsedTimeMillis;
nanoClock = builder.nanoClock;
- //Preconditions.checkArgument(initialIntervalMillis > 0);
- //Preconditions.checkArgument(0 <= randomizationFactor && randomizationFactor < 1);
- //Preconditions.checkArgument(multiplier >= 1);
- //Preconditions.checkArgument(maxIntervalMillis >= initialIntervalMillis);
- //Preconditions.checkArgument(maxElapsedTimeMillis > 0);
reset();
}
- /** Sets the interval back to the initial retry interval and restarts the timer. */
+ /**
+ * Sets the interval back to the initial retry interval and restarts the timer.
+ */
public final void reset() {
currentIntervalMillis = initialIntervalMillis;
startTimeNanos = nanoClock.nanoTime();
@@ -275,6 +272,29 @@ public class ExponentialBackOff implements BackOff {
}
}
+ /**
+ * Nano clock which can be used to measure elapsed time in nanoseconds.
+ *
+ *
+ * The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations
+ * may be used for testing.
+ *
+ *
+ */
+ public interface NanoClock {
+
+ /**
+ * Returns the current value of the most precise available system timer, in nanoseconds for use to
+ * measure elapsed time, to match the behavior of {@link System#nanoTime()}.
+ */
+ long nanoTime();
+
+ /**
+ * Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
+ */
+ NanoClock SYSTEM = System::nanoTime;
+ }
+
/**
* Builder for {@link ExponentialBackOff}.
*
@@ -285,7 +305,7 @@ public class ExponentialBackOff implements BackOff {
public static class Builder {
/** The initial retry interval in milliseconds. */
- int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS;
+ private int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS;
/**
* The randomization factor to use for creating a range around the retry interval.
@@ -295,32 +315,53 @@ public class ExponentialBackOff implements BackOff {
* above the retry interval.
*
*/
- double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR;
+ private double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR;
- /** The value to multiply the current interval with for each retry attempt. */
- double multiplier = DEFAULT_MULTIPLIER;
+ /**
+ * The value to multiply the current interval with for each retry attempt.
+ */
+ private double multiplier = DEFAULT_MULTIPLIER;
/**
* The maximum value of the back off period in milliseconds. Once the retry interval reaches
* this value it stops increasing.
*/
- int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS;
+ private int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS;
/**
* The maximum elapsed time in milliseconds after instantiating {@link ExponentialBackOff} or
* calling {@link #reset()} after which {@link #nextBackOffMillis()} returns
* {@link BackOff#STOP}.
*/
- int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS;
+ private int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS;
- /** Nano clock. */
- NanoClock nanoClock = NanoClock.SYSTEM;
+ /**
+ * Nano clock.
+ */
+ private NanoClock nanoClock = NanoClock.SYSTEM;
public Builder() {
}
- /** Builds a new instance of {@link ExponentialBackOff}. */
+ /**
+ * Builds a new instance of {@link ExponentialBackOff}.
+ * */
public ExponentialBackOff build() {
+ if (initialIntervalMillis <= 0) {
+ throw new IllegalArgumentException();
+ }
+ if (!(0 <= randomizationFactor && randomizationFactor < 1)) {
+ throw new IllegalArgumentException();
+ }
+ if (multiplier < 1) {
+ throw new IllegalArgumentException();
+ }
+ if ((maxIntervalMillis < initialIntervalMillis)) {
+ throw new IllegalArgumentException();
+ }
+ if (maxElapsedTimeMillis <= 0) {
+ throw new IllegalArgumentException();
+ }
return new ExponentialBackOff(this);
}
@@ -480,7 +521,9 @@ public class ExponentialBackOff implements BackOff {
*
*/
public Builder setNanoClock(NanoClock nanoClock) {
- this.nanoClock = nanoClock; //Preconditions.checkNotNull(nanoClock);
+ if (nanoClock != null) {
+ this.nanoClock = nanoClock;
+ }
return this;
}
}
diff --git a/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java b/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java
deleted file mode 100644
index 11e8e13..0000000
--- a/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2013 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.xbib.netty.http.client.retry;
-
-/**
- * Nano clock which can be used to measure elapsed time in nanoseconds.
- *
- *
- * The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations
- * may be used for testing.
- *
- *
- * @since 1.14
- * @author Yaniv Inbar
- */
-public interface NanoClock {
-
- /**
- * Returns the current value of the most precise available system timer, in nanoseconds for use to
- * measure elapsed time, to match the behavior of {@link System#nanoTime()}.
- */
- long nanoTime();
-
- /**
- * Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
- */
- NanoClock SYSTEM = new NanoClock() {
- public long nanoTime() {
- return System.nanoTime();
- }
- };
-}
diff --git a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java
index 617afdc..98c8ff6 100644
--- a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java
+++ b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java
@@ -5,7 +5,9 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.HttpConversionUtil;
@@ -16,6 +18,9 @@ import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.RequestBuilder;
+import org.xbib.netty.http.client.listener.CookieListener;
+import org.xbib.netty.http.client.listener.HttpHeadersListener;
+import org.xbib.netty.http.client.retry.BackOff;
import java.io.IOException;
import java.net.ConnectException;
@@ -48,6 +53,8 @@ abstract class BaseTransport implements Transport {
protected SortedMap requests;
+ protected Throwable throwable;
+
private Map cookieBox;
BaseTransport(Client client, HttpAddress httpAddress) {
@@ -56,43 +63,55 @@ abstract class BaseTransport implements Transport {
this.requests = new ConcurrentSkipListMap<>();
}
- @Override
- public HttpAddress httpAddress() {
- return httpAddress;
- }
-
@Override
public Transport execute(Request request) throws IOException {
ensureConnect();
- // some HTTP 1.1 servers like Elasticsearch do not understand full URIs in HTTP command line
- String uri = request.httpVersion().majorVersion() < 2 ?
+ if (throwable != null) {
+ return this;
+ }
+ // Some HTTP 1 servers do not understand URIs in HTTP command line in spite of RFC 7230.
+ // The "origin form" requires a "Host" header.
+ // Our algorithm is: use always "origin form" for HTTP 1, use absolute form for HTTP 2.
+ // The reason is that Netty derives the HTTP/2 scheme header from the absolute form.
+ String uri = request.httpVersion().majorVersion() == 1 ?
request.base().relativeReference() : request.base().toString();
FullHttpRequest fullHttpRequest = request.content() == null ?
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) :
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri,
request.content());
- logger.log(Level.INFO, fullHttpRequest.toString());
- Integer streamId = nextStream();
- if (streamId != null) {
- request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
+ try {
+ Integer streamId = nextStream();
+ if (streamId != null && streamId > 0) {
+ request.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
+ } else {
+ if (request.httpVersion().majorVersion() == 2) {
+ logger.log(Level.WARNING, "no streamId but HTTP/2 request. Strange!!! " + getClass().getName());
+ }
+ }
+ // add matching cookies from box (previous requests) and new cookies from request builder
+ Collection cookies = new ArrayList<>();
+ cookies.addAll(matchCookiesFromBox(request));
+ cookies.addAll(matchCookies(request));
+ if (!cookies.isEmpty()) {
+ request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies));
+ }
+ // add stream-id and cookie headers
+ fullHttpRequest.headers().set(request.headers());
+ if (streamId != null) {
+ requests.put(streamId, request);
+ }
+ if (channel.isWritable()) {
+ channel.writeAndFlush(fullHttpRequest);
+
+ }
+ } finally {
+ request.close();
}
- // add matching cookies from box (previous requests) and new cookies from request builder
- Collection cookies = new ArrayList<>();
- cookies.addAll(matchCookiesFromBox(request));
- cookies.addAll(matchCookies(request));
- if (!cookies.isEmpty()) {
- request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies));
- }
- // add stream-id and cookie headers
- fullHttpRequest.headers().set(request.headers());
- requests.put(streamId, request);
- logger.log(Level.FINE, () -> "streamId = " + streamId + " writing request = " + fullHttpRequest);
- channel.writeAndFlush(fullHttpRequest);
return this;
}
/**
- * Experimental.
+ * Experimental method for executing in a wrapping completable future.
* @param request request
* @param supplier supplier
* @param supplier result
@@ -102,44 +121,86 @@ abstract class BaseTransport implements Transport {
public CompletableFuture execute(Request request,
Function supplier) throws IOException {
final CompletableFuture completableFuture = new CompletableFuture<>();
- //request.setExceptionListener(completableFuture::completeExceptionally);
request.setResponseListener(response -> completableFuture.complete(supplier.apply(response)));
execute(request);
return completableFuture;
}
@Override
- public synchronized void close() {
+ public synchronized void close() throws IOException {
get();
- if (channel != null) {
- channel.close();
- channel = null;
- }
+ client.releaseChannel(channel);
}
- protected void ensureConnect() throws IOException {
- if (channel == null) {
- try {
- channel = client.newChannel(httpAddress);
- channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
- awaitSettings();
- } catch (InterruptedException e) {
- throw new ConnectException("unable to connect to " + httpAddress);
+ @Override
+ public boolean isFailed() {
+ return throwable != null;
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return throwable;
+ }
+
+ @Override
+ public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
+ Request request = fromStreamId(streamId);
+ if (request != null) {
+ HttpHeadersListener httpHeadersListener = request.getHeadersListener();
+ if (httpHeadersListener != null) {
+ httpHeadersListener.onHeaders(httpHeaders);
+ }
+ for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
+ Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
+ addCookie(cookie);
+ CookieListener cookieListener = request.getCookieListener();
+ if (cookieListener != null) {
+ cookieListener.onCookie(cookie);
+ }
}
}
}
- protected Request continuation(Integer streamId, FullHttpResponse httpResponse) throws URLSyntaxException {
+ private void ensureConnect() throws IOException {
+ if (channel == null) {
+ channel = client.newChannel(httpAddress);
+ if (channel != null) {
+ channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
+ awaitSettings();
+ } else {
+ ConnectException connectException;
+ if (httpAddress != null) {
+ connectException = new ConnectException("unable to connect to " + httpAddress);
+ } else if (client.hasPooledConnections()){
+ connectException = new ConnectException("unable to get channel from pool");
+ } else {
+ // if API misuse
+ connectException = new ConnectException("unable to get channel");
+ }
+ this.throwable = connectException;
+ this.channel = null;
+ throw connectException;
+ }
+ }
+ }
+
+ protected Request fromStreamId(Integer streamId) {
+ if (streamId == null) {
+ streamId = requests.lastKey();
+ }
+ return requests.get(streamId);
+ }
+
+ protected Request continuation(Request request, FullHttpResponse httpResponse) throws URLSyntaxException {
if (httpResponse == null) {
return null;
}
- Request request = fromStreamId(streamId);
if (request == null) {
- // push promise
+ // push promise or something else
return null;
}
try {
- if (request.checkRedirect()) {
+ if (request.canRedirect()) {
int status = httpResponse.status().code();
switch (status) {
case 300:
@@ -152,7 +213,7 @@ abstract class BaseTransport implements Transport {
String location = httpResponse.headers().get(HttpHeaderNames.LOCATION);
location = new PercentDecoder(StandardCharsets.UTF_8.newDecoder()).decode(location);
if (location != null) {
- logger.log(Level.INFO, "found redirect location: " + location);
+ logger.log(Level.FINE, "found redirect location: " + location);
URL redirUrl = URL.base(request.base()).resolve(location);
HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod();
RequestBuilder newHttpRequestBuilder = Request.builder(method)
@@ -160,45 +221,75 @@ abstract class BaseTransport implements Transport {
.setVersion(request.httpVersion())
.setHeaders(request.headers())
.content(request.content());
- // TODO(jprante) convencience to copy pathAndQuery from one request to another
request.base().getQueryParams().forEach(pair ->
newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond())
);
request.cookies().forEach(newHttpRequestBuilder::addCookie);
Request newHttpRequest = newHttpRequestBuilder.build();
newHttpRequest.setResponseListener(request.getResponseListener());
- //newHttpRequest.setExceptionListener(request.getExceptionListener());
newHttpRequest.setHeadersListener(request.getHeadersListener());
newHttpRequest.setCookieListener(request.getCookieListener());
- //newHttpRequest.setPushListener(request.getPushListener());
StringBuilder hostAndPort = new StringBuilder();
hostAndPort.append(redirUrl.getHost());
if (redirUrl.getPort() != null) {
hostAndPort.append(':').append(redirUrl.getPort());
}
newHttpRequest.headers().set(HttpHeaderNames.HOST, hostAndPort.toString());
- logger.log(Level.INFO, "redirect url: " + redirUrl +
+ logger.log(Level.FINE, "redirect url: " + redirUrl +
" old request: " + request.toString() +
" new request: " + newHttpRequest.toString());
return newHttpRequest;
}
break;
default:
- logger.log(Level.FINE, "no redirect because of status code " + status);
break;
}
}
} catch (MalformedInputException | UnmappableCharacterException e) {
- logger.log(Level.WARNING, e.getMessage(), e);
+ this.throwable = e;
}
return null;
}
- protected Request fromStreamId(Integer streamId) {
- if (streamId == null) {
- streamId = requests.lastKey();
+ protected Request retry(Request request, FullHttpResponse httpResponse) {
+ if (httpResponse == null) {
+ return null;
}
- return requests.get(streamId);
+ if (request == null) {
+ // push promise or something else
+ return null;
+ }
+ if (request.isBackOff()) {
+ BackOff backOff = request.getBackOff() != null ? request.getBackOff() :
+ client.getClientConfig().getBackOff();
+ int status = httpResponse.status().code();
+ switch (status) {
+ case 403:
+ case 404:
+ case 500:
+ case 502:
+ case 503:
+ case 504:
+ case 507:
+ case 509:
+ if (backOff != null) {
+ long millis = backOff.nextBackOffMillis();
+ if (millis != BackOff.STOP) {
+ logger.log(Level.FINE, "status = " + status + " backing off request by " + millis + " milliseconds");
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return request;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ return null;
}
public void setCookieBox(Map cookieBox) {
@@ -209,7 +300,7 @@ abstract class BaseTransport implements Transport {
return cookieBox;
}
- public void addCookie(Cookie cookie) {
+ private void addCookie(Cookie cookie) {
if (cookieBox == null) {
this.cookieBox = Collections.synchronizedMap(new LRUCache(32));
}
@@ -241,7 +332,8 @@ abstract class BaseTransport implements Transport {
return (secureScheme && cookie.isSecure()) || (!secureScheme && !cookie.isSecure());
}
- class LRUCache extends LinkedHashMap {
+ @SuppressWarnings("serial")
+ static class LRUCache extends LinkedHashMap {
private final int cacheSize;
diff --git a/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java b/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java
index 4edb3a4..69ccd5a 100644
--- a/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java
+++ b/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java
@@ -2,18 +2,12 @@ package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
-import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import org.xbib.net.URLSyntaxException;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
-import org.xbib.netty.http.client.listener.CookieListener;
-import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
@@ -27,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class Http2Transport extends BaseTransport implements Transport {
+public class Http2Transport extends BaseTransport {
private static final Logger logger = Logger.getLogger(Http2Transport.class.getName());
@@ -41,7 +35,9 @@ public class Http2Transport extends BaseTransport implements Transport {
super(client, httpAddress);
streamIdCounter = new AtomicInteger(3);
streamidPromiseMap = new ConcurrentSkipListMap<>();
- settingsPromise = new CompletableFuture<>();
+ settingsPromise = (httpAddress != null && httpAddress.isSecure()) ||
+ (client.hasPooledConnections() && client.getPool().isSecure()) ?
+ new CompletableFuture<>() : null;
}
@Override
@@ -69,62 +65,47 @@ public class Http2Transport extends BaseTransport implements Transport {
public void awaitSettings() {
if (settingsPromise != null) {
try {
- settingsPromise.get(client.getTimeout(), TimeUnit.MILLISECONDS);
+ settingsPromise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
settingsPromise.completeExceptionally(e);
}
- } else {
- logger.log(Level.WARNING, "waiting for settings but no promise present");
}
}
@Override
public void responseReceived(Integer streamId, FullHttpResponse fullHttpResponse) {
if (streamId == null) {
- logger.log(Level.WARNING, "unexpected message received: " + fullHttpResponse);
+ logger.log(Level.WARNING, "no stream ID, unexpected message received: " + fullHttpResponse);
return;
}
CompletableFuture promise = streamidPromiseMap.get(streamId);
if (promise == null) {
- logger.log(Level.WARNING, "response received for unknown stream id " + streamId);
- } else {
- Request request = fromStreamId(streamId);
- if (request != null) {
- HttpResponseListener responseListener = request.getResponseListener();
- if (responseListener != null) {
- responseListener.onResponse(fullHttpResponse);
- }
- try {
- request = continuation(streamId, fullHttpResponse);
- if (request != null) {
- // synchronous call here
- client.continuation(this, request);
- }
- } catch (URLSyntaxException | IOException e) {
- logger.log(Level.WARNING, e.getMessage(), e);
- }
- }
- // complete origin
- promise.complete(true);
+ logger.log(Level.WARNING, "response received for stream ID " + streamId + " but found no promise");
+ return;
}
- }
-
- @Override
- public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
Request request = fromStreamId(streamId);
if (request != null) {
- HttpHeadersListener httpHeadersListener = request.getHeadersListener();
- if (httpHeadersListener != null) {
- httpHeadersListener.onHeaders(httpHeaders);
+ HttpResponseListener responseListener = request.getResponseListener();
+ if (responseListener != null) {
+ responseListener.onResponse(fullHttpResponse);
}
- CookieListener cookieListener = request.getCookieListener();
- if (cookieListener != null) {
- for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
- Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
- cookieListener.onCookie(cookie);
+ try {
+ Request retryRequest = retry(request, fullHttpResponse);
+ if (retryRequest != null) {
+ // retry transport, wait for completion
+ client.retry(this, retryRequest);
+ } else {
+ Request continueRequest = continuation(request, fullHttpResponse);
+ if (continueRequest != null) {
+ // continue with new transport, synchronous call here, wait for completion
+ client.continuation(this, continueRequest);
+ }
}
+ } catch (URLSyntaxException | IOException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
}
}
+ promise.complete(true);
}
@Override
@@ -134,16 +115,25 @@ public class Http2Transport extends BaseTransport implements Transport {
}
@Override
- public void awaitResponse(Integer streamId) {
+ public void awaitResponse(Integer streamId) throws IOException {
if (streamId == null) {
return;
}
+ if (throwable != null) {
+ return;
+ }
CompletableFuture promise = streamidPromiseMap.get(streamId);
if (promise != null) {
try {
- promise.get(client.getTimeout(), TimeUnit.MILLISECONDS);
+ long millis = client.getClientConfig().getReadTimeoutMillis();
+ Request request = fromStreamId(streamId);
+ if (request != null && request.getTimeoutInMillis() > 0) {
+ millis = request.getTimeoutInMillis();
+ }
+ promise.get(millis, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- logger.log(Level.WARNING, "streamId=" + streamId + " " + e.getMessage(), e);
+ this.throwable = e;
+ throw new IOException(e);
} finally {
streamidPromiseMap.remove(streamId);
}
@@ -153,7 +143,14 @@ public class Http2Transport extends BaseTransport implements Transport {
@Override
public Transport get() {
for (Integer streamId : streamidPromiseMap.keySet()) {
- awaitResponse(streamId);
+ try {
+ awaitResponse(streamId);
+ } catch (IOException e) {
+ notifyRequest(streamId, e);
+ }
+ }
+ if (throwable != null) {
+ streamidPromiseMap.clear();
}
return this;
}
@@ -165,10 +162,32 @@ public class Http2Transport extends BaseTransport implements Transport {
}
}
+ /**
+ * The underlying network layer failed, not possible to know the request.
+ * So we fail all (open) promises.
+ * @param throwable the exception
+ */
@Override
public void fail(Throwable throwable) {
+ // fail fast, do not fail more than once
+ if (this.throwable != null) {
+ return;
+ }
+ this.throwable = throwable;
for (CompletableFuture promise : streamidPromiseMap.values()) {
promise.completeExceptionally(throwable);
}
}
+
+ /**
+ * Try to notify request about failure.
+ * @param streamId stream ID
+ * @param throwable the exception
+ */
+ private void notifyRequest(Integer streamId, Throwable throwable) {
+ Request request = fromStreamId(streamId);
+ if (request != null && request.getCompletableFuture() != null) {
+ request.getCompletableFuture().completeExceptionally(throwable);
+ }
+ }
}
diff --git a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java
index 1356be4..94b0dd2 100644
--- a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java
+++ b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java
@@ -2,18 +2,12 @@ package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
-import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import org.xbib.net.URLSyntaxException;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
-import org.xbib.netty.http.client.listener.CookieListener;
-import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
@@ -27,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class HttpTransport extends BaseTransport implements Transport {
+public class HttpTransport extends BaseTransport {
private static final Logger logger = Logger.getLogger(HttpTransport.class.getName());
@@ -43,7 +37,7 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override
public Integer nextStream() {
- Integer streamId = sequentialCounter.getAndAdd(1);
+ Integer streamId = sequentialCounter.getAndIncrement();
if (streamId == Integer.MIN_VALUE) {
// reset if overflow, Java wraps atomic integers to Integer.MIN_VALUE
sequentialCounter.set(0);
@@ -71,9 +65,18 @@ public class HttpTransport extends BaseTransport implements Transport {
}
}
try {
- request = continuation(null, fullHttpResponse);
- if (request != null) {
- client.continuation(this, request);
+ Request retryRequest = retry(request, fullHttpResponse);
+ if (retryRequest != null) {
+ // retry transport, wait for completion
+ client.retry(this, retryRequest);
+ retryRequest.close();
+ } else {
+ Request continueRequest = continuation(request, fullHttpResponse);
+ if (continueRequest != null) {
+ // continue with new transport, synchronous call here, wait for completion
+ client.continuation(this, continueRequest);
+ continueRequest.close();
+ }
}
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
@@ -86,40 +89,25 @@ public class HttpTransport extends BaseTransport implements Transport {
}
}
- @Override
- public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
- Request request = fromStreamId(streamId);
- if (request != null) {
- HttpHeadersListener httpHeadersListener = request.getHeadersListener();
- if (httpHeadersListener != null) {
- httpHeadersListener.onHeaders(httpHeaders);
- }
- for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
- Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
- addCookie(cookie);
- CookieListener cookieListener = request.getCookieListener();
- if (cookieListener != null) {
- cookieListener.onCookie(cookie);
- }
- }
- }
- }
-
@Override
public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) {
}
@Override
- public void awaitResponse(Integer streamId) {
+ public void awaitResponse(Integer streamId) throws IOException {
if (streamId == null) {
return;
}
+ if (throwable != null) {
+ return;
+ }
CompletableFuture promise = sequentialPromiseMap.get(streamId);
if (promise != null) {
try {
- promise.get(client.getTimeout(), TimeUnit.MILLISECONDS);
+ promise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- logger.log(Level.WARNING, "streamId=" + streamId + " " + e.getMessage(), e);
+ this.throwable = e;
+ throw new IOException(e);
} finally {
sequentialPromiseMap.remove(streamId);
}
@@ -128,8 +116,15 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override
public Transport get() {
- for (Integer streamId : sequentialPromiseMap.keySet()) {
- awaitResponse(streamId);
+ try {
+ for (Integer streamId : sequentialPromiseMap.keySet()) {
+ awaitResponse(streamId);
+ client.releaseChannel(channel);
+ }
+ } catch (IOException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ } finally {
+ sequentialPromiseMap.clear();
}
return this;
}
@@ -143,9 +138,9 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override
public void fail(Throwable throwable) {
+ this.throwable = throwable;
for (CompletableFuture promise : sequentialPromiseMap.values()) {
promise.completeExceptionally(throwable);
}
}
-
}
diff --git a/src/main/java/org/xbib/netty/http/client/transport/Transport.java b/src/main/java/org/xbib/netty/http/client/transport/Transport.java
index 86aec30..0fca5c1 100644
--- a/src/main/java/org/xbib/netty/http/client/transport/Transport.java
+++ b/src/main/java/org/xbib/netty/http/client/transport/Transport.java
@@ -7,7 +7,6 @@ import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.AttributeKey;
-import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
@@ -19,8 +18,6 @@ public interface Transport {
AttributeKey TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport");
- HttpAddress httpAddress();
-
Transport execute(Request request) throws IOException;
CompletableFuture execute(Request request, Function supplier) throws IOException;
@@ -41,7 +38,7 @@ public interface Transport {
void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers);
- void awaitResponse(Integer streamId);
+ void awaitResponse(Integer streamId) throws IOException;
Transport get();
@@ -49,5 +46,9 @@ public interface Transport {
void fail(Throwable throwable);
- void close();
+ boolean isFailed();
+
+ Throwable getFailure();
+
+ void close() throws IOException;
}
diff --git a/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java b/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java
index 5ffca22..9744621 100644
--- a/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java
+++ b/src/main/java/org/xbib/netty/http/client/util/NetworkUtils.java
@@ -395,10 +395,8 @@ public class NetworkUtils {
if (predicate.test(networkInterface)) {
networkInterfaces.add(networkInterface);
Enumeration subInterfaces = networkInterface.getSubInterfaces();
- if (subInterfaces.hasMoreElements()) {
- while (subInterfaces.hasMoreElements()) {
- networkInterfaces.add(subInterfaces.nextElement());
- }
+ while (subInterfaces.hasMoreElements()) {
+ networkInterfaces.add(subInterfaces.nextElement());
}
}
}
diff --git a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java
index 8aa5261..e198e36 100644
--- a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java
@@ -26,7 +26,7 @@ public class CompletableFutureTest {
final Function httpResponseStringFunction = response ->
response.content().toString(StandardCharsets.UTF_8);
Request request = Request.get()
- .url("http://alkmene.hbz-nrw.de/repository/org/xbib/content/2.0.0-SNAPSHOT/maven-metadata-local.xml")
+ .url("https://repo.maven.apache.org/maven2/org/xbib/netty-http-client/maven-metadata.xml.sha1")
.build();
CompletableFuture completableFuture = client.execute(request, httpResponseStringFunction)
.exceptionally(Throwable::getMessage)
diff --git a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java
index 3e974dc..9ccbe50 100644
--- a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java
@@ -17,15 +17,14 @@ public class ConscryptTest extends LoggingBase {
@Test
public void testConscrypt() throws IOException {
Client client = Client.builder()
- .enableDebug()
.setJdkSslProvider()
.setSslContextProvider(Conscrypt.newProvider())
.build();
logger.log(Level.INFO, client.getClientConfig().toString());
try {
Request request = Request.get()
- .url("https://fl-test.hbz-nrw.de")
- .setVersion("HTTP/2.0")
+ .url("https://xbib.org")
+ .setVersion("HTTP/1.1")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
diff --git a/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java b/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java
index d0970b2..5f2e788 100644
--- a/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java
@@ -26,7 +26,7 @@ public class CookieSetterHttpBinTest extends LoggingBase {
* }
* }
*
- * @throws Exception
+ * @throws IOException if test fails
*/
@Test
public void testHttpBinCookies() throws IOException {
diff --git a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java
index 7e53223..b4f6f97 100644
--- a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java
@@ -83,7 +83,7 @@ public class ElasticsearchTest extends LoggingBase {
.build()
.setResponseListener(fullHttpResponse ->
logger.log(Level.FINE, "status = " + fullHttpResponse.status() +
- " counter = " + count.incrementAndGet() +
+ " counter = " + count.getAndIncrement() +
" response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8)));
}
diff --git a/src/test/java/org/xbib/netty/http/client/test/Http1Test.java b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java
index f01e312..09f51d4 100644
--- a/src/test/java/org/xbib/netty/http/client/test/Http1Test.java
+++ b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java
@@ -1,24 +1,34 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod;
+import org.junit.After;
+import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class Http1Test {
+public class Http1Test extends LoggingBase {
private static final Logger logger = Logger.getLogger(Http1Test.class.getName());
+ @After
+ public void checkThreads() {
+ Set threadSet = Thread.getAllStackTraces().keySet();
+ logger.log(Level.INFO, "threads = " + threadSet.size() );
+ threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString()));
+ }
+
@Test
public void testHttp1() throws Exception {
Client client = new Client();
try {
- Request request = Request.get().url("http://fl.hbz-nrw.de").build()
+ Request request = Request.get().url("http://xbib.org").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) +
@@ -30,26 +40,29 @@ public class Http1Test {
}
@Test
- public void testHttp1ParallelRequests() throws IOException {
- Client client = new Client();
+ @Ignore
+ public void testParallelRequests() throws IOException {
+ Client client = Client.builder().enableDebug().build();
try {
Request request1 = Request.builder(HttpMethod.GET)
- .url("http://fl.hbz-nrw.de").setVersion("HTTP/1.1")
+ .url("http://xbib.org").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
Request request2 = Request.builder(HttpMethod.GET)
- .url("http://fl.hbz-nrw.de/app/fl/").setVersion("HTTP/1.1")
+ .url("http://xbib.org").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
- client.execute(request1);
- client.execute(request2);
+ for (int i = 0; i < 10; i++) {
+ client.execute(request1);
+ client.execute(request2);
+ }
} finally {
client.shutdownGracefully();
@@ -57,7 +70,8 @@ public class Http1Test {
}
@Test
- public void testTwoTransports() throws Exception {
+ @Ignore
+ public void testSequentialRequests() throws Exception {
Client client = Client.builder().enableDebug().build();
try {
Request request1 = Request.get().url("http://xbib.org").build()
diff --git a/src/test/java/org/xbib/netty/http/client/test/Http2Test.java b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java
index 1c2d2ac..94c4cad 100644
--- a/src/test/java/org/xbib/netty/http/client/test/Http2Test.java
+++ b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java
@@ -11,25 +11,39 @@ import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class Http2Test {
+public class Http2Test extends LoggingBase {
private static final Logger logger = Logger.getLogger(Http2Test.class.getName());
/**
+ * Problems with akamai:
+ *
+ * 2018-03-07 16:02:52.385 FEIN [client] io.netty.handler.codec.http2.Http2FrameLogger logRstStream
+ * [id: 0x57cc65bb, L:/10.1.1.94:52834 - R:http2.akamai.com/104.94.191.203:443] INBOUND RST_STREAM: streamId=2 errorCode=8
+ * 2018-03-07 16:02:52.385 FEIN [client] io.netty.handler.codec.http2.Http2FrameLogger logGoAway
+ * [id: 0x57cc65bb, L:/10.1.1.94:52834 - R:http2.akamai.com/104.94.191.203:443] OUTBOUND GO_AWAY: lastStreamId=2 errorCode=0 length=0 bytes=
+ *
+ * demo/h2_demo_frame.html sends no content, only a push promise, and does not continue
+ *
+ * @throws IOException
*/
@Test
+ @Ignore
public void testAkamai() throws IOException {
- Client client = Client.builder().enableDebug().build();
+ Client client = Client.builder()
+ .enableDebug()
+ .addServerNameForIdentification("http2.akamai.com")
+ .build();
try {
Request request = Request.get()
.url("https://http2.akamai.com/demo/h2_demo_frame.html")
//.url("https://http2.akamai.com/")
.setVersion("HTTP/2.0")
.build()
- .setResponseListener(fullHttpResponse -> {
- String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
- logger.log(Level.INFO, "status = " + fullHttpResponse.status()
- + " response body = " + response);
+ .setResponseListener(msg -> {
+ String response = msg.content().toString(StandardCharsets.UTF_8);
+ logger.log(Level.INFO, "status = " + msg.status() +
+ msg.headers().entries() + " " + response);
});
client.execute(request).get();
} finally {
@@ -55,17 +69,18 @@ public class Http2Test {
@Test
public void testHttp2PushIO() throws IOException {
- //String url = "https://webtide.com";
String url = "https://http2-push.io";
- // TODO register push announces into promises in order to wait for them all.
- Client client = Client.builder().enableDebug().build();
+ Client client = Client.builder()
+ .enableDebug()
+ .addServerNameForIdentification("http2-push.io")
+ .build();
try {
Request request = Request.builder(HttpMethod.GET)
.url(url).setVersion("HTTP/2.0")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
- msg.content().toString(StandardCharsets.UTF_8) +
+ //msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request).get();
@@ -75,7 +90,7 @@ public class Http2Test {
}
@Test
- public void testWebtideTwoRequestsOnSameConnection() {
+ public void testWebtideTwoRequestsOnSameConnection() throws IOException {
Client client = new Client();
try {
Request request1 = Request.builder(HttpMethod.GET)
@@ -95,8 +110,6 @@ public class Http2Test {
" status=" + msg.status().code()));
client.execute(request1).execute(request2);
- } catch (IOException e) {
- //
} finally {
client.shutdownGracefully();
}
diff --git a/src/test/java/org/xbib/netty/http/client/test/LeakTest.java b/src/test/java/org/xbib/netty/http/client/test/LeakTest.java
new file mode 100644
index 0000000..b8361d0
--- /dev/null
+++ b/src/test/java/org/xbib/netty/http/client/test/LeakTest.java
@@ -0,0 +1,28 @@
+package org.xbib.netty.http.client.test;
+
+import org.junit.After;
+import org.junit.Test;
+import org.xbib.netty.http.client.Client;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class LeakTest {
+
+ private static final Logger logger = Logger.getLogger(LeakTest.class.getName());
+
+ @After
+ public void checkThreads() {
+ Set threadSet = Thread.getAllStackTraces().keySet();
+ logger.log(Level.INFO, "threads = " + threadSet.size() );
+ threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString()));
+ }
+
+ @Test
+ public void testForLeaks() throws IOException, InterruptedException {
+ Client client = new Client();
+ client.shutdownGracefully();
+ }
+}
diff --git a/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java b/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java
index ad57a4e..e734d15 100644
--- a/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java
+++ b/src/test/java/org/xbib/netty/http/client/test/LoggingBase.java
@@ -17,7 +17,7 @@ public class LoggingBase {
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
- rootLogger.setLevel(Level.INFO);
+ rootLogger.setLevel(Level.ALL);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.ALL);
diff --git a/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java b/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java
new file mode 100644
index 0000000..1ef9c34
--- /dev/null
+++ b/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java
@@ -0,0 +1,65 @@
+package org.xbib.netty.http.client.test;
+
+import org.junit.Test;
+import org.xbib.netty.http.client.Client;
+import org.xbib.netty.http.client.HttpAddress;
+import org.xbib.netty.http.client.Request;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class PooledClientTest extends LoggingBase {
+
+ private static final Logger logger = Logger.getLogger("");
+
+ @Test
+ public void testPooledClientWithSingleNode() throws IOException {
+ int loop = 10;
+ HttpAddress httpAddress = HttpAddress.http1("xbib.org", 80);
+ Client client = Client.builder()
+ .addPoolNode(httpAddress)
+ .setPoolSecure(httpAddress.isSecure())
+ .setPoolNodeConnectionLimit(16)
+ .build();
+ AtomicInteger count = new AtomicInteger();
+ try {
+ int threads = 16;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int n = 0; n < threads; n++) {
+ executorService.submit(() -> {
+ try {
+ logger.log(Level.INFO, "starting " + Thread.currentThread());
+ for (int i = 0; i < loop; i++) {
+ Request request = Request.get()
+ .url("http://xbib.org/repository/")
+ .setVersion("HTTP/1.1")
+ .build()
+ .setResponseListener(fullHttpResponse -> {
+ String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
+ //logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
+ });
+ client.pooledExecute(request).get();
+ count.getAndIncrement();
+ }
+ logger.log(Level.INFO, "done " + Thread.currentThread());
+ } catch (IOException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ }
+ });
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ } finally {
+ client.shutdownGracefully();
+ }
+ logger.log(Level.INFO, "count = " + count.get());
+ }
+}
diff --git a/src/test/java/org/xbib/netty/http/client/test/XbibTest.java b/src/test/java/org/xbib/netty/http/client/test/XbibTest.java
index 4584329..cb011cb 100644
--- a/src/test/java/org/xbib/netty/http/client/test/XbibTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/XbibTest.java
@@ -88,11 +88,11 @@ public class XbibTest extends LoggingBase {
@Test
public void testXbibOrgWithVeryShortReadTimeout() throws IOException {
Client httpClient = Client.builder()
- .setReadTimeoutMillis(50)
.build();
try {
httpClient.execute(Request.get()
.url("http://xbib.org")
+ .setTimeoutInMillis(10)
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java
index 8c04cf4..658b8e1 100644
--- a/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java
@@ -13,13 +13,14 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpVersion;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool;
-import org.xbib.netty.http.client.pool.SimpleChannelPool;
+import org.xbib.netty.http.client.pool.BoundedChannelPool;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -34,6 +35,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;
+@Ignore
public class EpollTest {
private static final Logger logger = Logger.getLogger(EpollTest.class.getName());
@@ -74,7 +76,8 @@ public class EpollTest {
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
- channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0);
+ channelPool = new BoundedChannelPool<>(semaphore, HttpVersion.HTTP_1_1,false,
+ NODES, bootstrap, null, 0);
channelPool.prepare(CONCURRENCY);
}
@@ -85,7 +88,6 @@ public class EpollTest {
mockEpollServer.close();
}
- @Ignore
@Test
public void testPoolEpoll() throws Exception {
LongAdder longAdder = new LongAdder();
diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java
index e65e767..5be5cbc 100644
--- a/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java
@@ -12,12 +12,13 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpVersion;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool;
-import org.xbib.netty.http.client.pool.SimpleChannelPool;
+import org.xbib.netty.http.client.pool.BoundedChannelPool;
import java.util.Collections;
import java.util.List;
@@ -72,7 +73,8 @@ public class NioTest {
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
- channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0);
+ channelPool = new BoundedChannelPool<>(semaphore, HttpVersion.HTTP_1_1,false,
+ NODES, bootstrap, null, 0);
channelPool.prepare(CONCURRENCY);
}
diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java
similarity index 91%
rename from src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java
rename to src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java
index 9351230..dcd6738 100644
--- a/src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java
@@ -2,6 +2,14 @@ package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.AttributeKey;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.xbib.netty.http.client.HttpAddress;
+import org.xbib.netty.http.client.pool.BoundedChannelPool;
+import org.xbib.netty.http.client.pool.Pool;
import java.util.ArrayList;
import java.util.Arrays;
@@ -17,21 +25,13 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;
-import io.netty.util.AttributeKey;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.xbib.netty.http.client.HttpAddress;
-import org.xbib.netty.http.client.pool.Pool;
-import org.xbib.netty.http.client.pool.SimpleChannelPool;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
-public class SimplePoolTest {
+public class PoolTest {
- private static final Logger logger = Logger.getLogger(SimplePoolTest.class.getName());
+ private static final Logger logger = Logger.getLogger(PoolTest.class.getName());
private static final int TEST_STEP_TIME_SECONDS = 50;
@@ -51,14 +51,14 @@ public class SimplePoolTest {
});
}
- public SimplePoolTest(int concurrencyLevel, int nodeCount) {
+ public PoolTest(int concurrencyLevel, int nodeCount) {
this.nodeCount = nodeCount;
List nodes = new ArrayList<>();
for (int i = 0; i < nodeCount; i ++) {
nodes.add(HttpAddress.http1("localhost" + i));
}
- try (Pool pool = new SimpleChannelPool<>(new Semaphore(concurrencyLevel), nodes, new Bootstrap(),
- null, 0)) {
+ try (Pool pool = new BoundedChannelPool<>(new Semaphore(concurrencyLevel), HttpVersion.HTTP_1_1, false,
+ nodes, new Bootstrap(), null, 0)) {
int n = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(n);
for(int i = 0; i < n; i ++) {
diff --git a/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java b/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java
index 3b8e4c4..17621d8 100644
--- a/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java
@@ -3,7 +3,6 @@ package org.xbib.netty.http.client.test.retry;
import org.junit.Test;
import org.xbib.netty.http.client.retry.BackOff;
import org.xbib.netty.http.client.retry.ExponentialBackOff;
-import org.xbib.netty.http.client.retry.NanoClock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -136,7 +135,7 @@ public class ExponentialBackOffTest {
assertEquals(testMaxInterval, backOffPolicy.getCurrentIntervalMillis());
}
- static class MyNanoClock implements NanoClock {
+ static class MyNanoClock implements ExponentialBackOff.NanoClock {
private int i = 0;
private long startSeconds;
diff --git a/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java
index 9fa01f6..e72c4db 100644
--- a/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java
+++ b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java
@@ -23,11 +23,13 @@ public class MockBackOff implements BackOff {
/** Number of tries so far. */
private int numTries;
- public void reset() throws IOException {
+ @Override
+ public void reset() {
numTries = 0;
}
- public long nextBackOffMillis() throws IOException {
+ @Override
+ public long nextBackOffMillis() {
if (numTries >= maxTries || backOffMillis == STOP) {
return STOP;
}
diff --git a/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java b/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java
index 5065847..fea4833 100644
--- a/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java
+++ b/src/test/java/org/xbib/netty/http/client/test/simple/SimpleHttp1Test.java
@@ -1,11 +1,9 @@
package org.xbib.netty.http.client.test.simple;
import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -20,11 +18,8 @@ import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http2.Http2Settings;
-import io.netty.handler.codec.http2.HttpConversionUtil;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
import io.netty.util.AttributeKey;
+import org.junit.After;
import org.junit.Test;
import java.io.IOException;
@@ -32,6 +27,7 @@ import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -46,17 +42,26 @@ public class SimpleHttp1Test {
private static final Logger logger = Logger.getLogger(SimpleHttp1Test.class.getName());
+ @After
+ public void checkThreads() {
+ Set threadSet = Thread.getAllStackTraces().keySet();
+ logger.log(Level.INFO, "threads = " + threadSet.size() );
+ threadSet.forEach( thread -> {
+ if (thread.getName().equals("ObjectCleanerThread")) {
+ logger.log(Level.INFO, thread.toString());
+ }
+ });
+ }
+
@Test
public void testHttp1() throws Exception {
Client client = new Client();
try {
- HttpTransport transport = client.newTransport("fl.hbz-nrw.de", 80);
+ HttpTransport transport = client.newTransport("xbib.org", 80);
transport.onResponse(string -> logger.log(Level.INFO, "got messsage: " + string));
transport.connect();
- transport.awaitSettings();
sendRequest(transport);
- transport.awaitResponses();
- transport.close();
+ transport.awaitResponse();
} finally {
client.shutdown();
}
@@ -67,20 +72,18 @@ public class SimpleHttp1Test {
if (channel == null) {
return;
}
- Integer streamId = transport.nextStream();
String host = transport.inetSocketAddress().getHostString();
int port = transport.inetSocketAddress().getPort();
- String uri = "https://" + host + ":" + port;
+ String uri = "http://" + host + ":" + port;
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
request.headers().add(HttpHeaderNames.HOST, host + ":" + port);
request.headers().add(HttpHeaderNames.USER_AGENT, "Java");
request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.DEFLATE);
- if (streamId != null) {
- request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
- }
logger.log(Level.INFO, () -> "writing request = " + request);
- channel.writeAndFlush(request);
+ if (channel.isWritable()) {
+ channel.writeAndFlush(request);
+ }
}
private AttributeKey TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport");
@@ -115,16 +118,14 @@ public class SimpleHttp1Test {
return bootstrap;
}
- Initializer initializer() {
- return initializer;
- }
-
- HttpResponseHandler responseHandler() {
- return httpResponseHandler;
- }
-
void shutdown() {
+ close();
eventLoopGroup.shutdownGracefully();
+ try {
+ eventLoopGroup.awaitTermination(10L, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ }
}
HttpTransport newTransport(String host, int port) {
@@ -133,18 +134,11 @@ public class SimpleHttp1Test {
return transport;
}
- List transports() {
- return transports;
- }
-
- void close(HttpTransport transport) {
- transports.remove(transport);
- }
-
- void close() {
+ synchronized void close() {
for (HttpTransport transport : transports) {
transport.close();
}
+ transports.clear();
}
}
@@ -165,10 +159,6 @@ public class SimpleHttp1Test {
this.inetSocketAddress = inetSocketAddress;
}
- Client client() {
- return client;
- }
-
InetSocketAddress inetSocketAddress() {
return inetSocketAddress;
}
@@ -176,52 +166,33 @@ public class SimpleHttp1Test {
void connect() throws InterruptedException {
channel = client.bootstrap().connect(inetSocketAddress).sync().await().channel();
channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
+ promise = new CompletableFuture<>();
}
Channel channel() {
return channel;
}
- Integer nextStream() {
- promise = new CompletableFuture<>();
- return null;
- }
-
void onResponse(ResponseWriter responseWriter) {
this.responseWriter = responseWriter;
}
- void settingsReceived(Channel channel, Http2Settings http2Settings) {
- }
-
- void awaitSettings() {
- }
-
- void responseReceived(Integer streamId, String message) {
- if (promise == null) {
- logger.log(Level.WARNING, "message received for unknown stream id " + streamId);
- } else {
- if (responseWriter != null) {
- responseWriter.write(message);
- }
+ void responseReceived(FullHttpResponse msg) {
+ if (responseWriter != null) {
+ responseWriter.write(msg.content().toString(StandardCharsets.UTF_8));
}
}
- void awaitResponse(Integer streamId) {
+
+ void awaitResponse() {
if (promise != null) {
try {
- logger.log(Level.INFO, "waiting for response");
promise.get(5, TimeUnit.SECONDS);
- logger.log(Level.INFO, "response received");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
}
}
- void awaitResponses() {
- awaitResponse(null);
- }
-
void complete() {
if (promise != null) {
promise.complete(true);
@@ -238,7 +209,6 @@ public class SimpleHttp1Test {
if (channel != null) {
channel.close();
}
- client.close(this);
}
}
@@ -252,7 +222,6 @@ public class SimpleHttp1Test {
@Override
protected void initChannel(SocketChannel ch) {
- ch.pipeline().addLast(new TrafficLoggingHandler());
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1048576));
ch.pipeline().addLast(httpResponseHandler);
@@ -265,7 +234,7 @@ public class SimpleHttp1Test {
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
HttpTransport transport = ctx.channel().attr(TRANSPORT_ATTRIBUTE_KEY).get();
if (msg.content().isReadable()) {
- transport.responseReceived(null, msg.content().toString(StandardCharsets.UTF_8));
+ transport.responseReceived(msg);
}
}
@@ -290,35 +259,4 @@ public class SimpleHttp1Test {
ctx.channel().close();
}
}
-
- class TrafficLoggingHandler extends LoggingHandler {
-
- TrafficLoggingHandler() {
- super("client", LogLevel.INFO);
- }
-
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) {
- ctx.fireChannelRegistered();
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) {
- ctx.fireChannelUnregistered();
- }
-
- @Override
- public void flush(ChannelHandlerContext ctx) {
- ctx.flush();
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) {
- ctx.write(msg, promise);
- } else {
- super.write(ctx, msg, promise);
- }
- }
- }
}