add pool implementation, back-off for retry, cleaning up

This commit is contained in:
Jörg Prante 2018-03-08 15:57:17 +01:00
parent edb85cfdd8
commit 8593996f2b
40 changed files with 1404 additions and 847 deletions

View file

@ -2,6 +2,7 @@ import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
plugins { plugins {
id "com.github.spotbugs" version "1.6.1"
id "org.sonarqube" version "2.6.1" id "org.sonarqube" version "2.6.1"
id "io.codearte.nexus-staging" version "0.11.0" id "io.codearte.nexus-staging" version "0.11.0"
id "org.xbib.gradle.plugin.asciidoctor" version "1.6.0.0" id "org.xbib.gradle.plugin.asciidoctor" version "1.6.0.0"
@ -23,10 +24,10 @@ printf "Date: %s\nHost: %s\nOS: %s %s %s\nJVM: %s %s %s %s\nGradle: %s Groovy: %
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'maven' apply plugin: 'maven'
apply plugin: 'signing' apply plugin: 'signing'
apply plugin: "com.github.spotbugs"
apply plugin: "io.codearte.nexus-staging" apply plugin: "io.codearte.nexus-staging"
apply plugin: 'org.xbib.gradle.plugin.asciidoctor' apply plugin: 'org.xbib.gradle.plugin.asciidoctor'
configurations { configurations {
alpnagent alpnagent
asciidoclet asciidoclet
@ -37,7 +38,7 @@ dependencies {
compile "org.xbib:net-url:${project.property('xbib-net-url.version')}" compile "org.xbib:net-url:${project.property('xbib-net-url.version')}"
compile "io.netty:netty-codec-http2:${project.property('netty.version')}" compile "io.netty:netty-codec-http2:${project.property('netty.version')}"
compile "io.netty:netty-handler-proxy:${project.property('netty.version')}" compile "io.netty:netty-handler-proxy:${project.property('netty.version')}"
testCompile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}" compile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}"
testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}" testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}"
testCompile "junit:junit:${project.property('junit.version')}" testCompile "junit:junit:${project.property('junit.version')}"
@ -66,7 +67,7 @@ test {
jvmArgs "-javaagent:" + configurations.alpnagent.asPath jvmArgs "-javaagent:" + configurations.alpnagent.asPath
} }
testLogging { testLogging {
showStandardStreams = false showStandardStreams = true
exceptionFormat = 'full' exceptionFormat = 'full'
} }
} }
@ -121,5 +122,109 @@ if (project.hasProperty('signing.keyId')) {
} }
} }
apply from: 'gradle/ext.gradle'
apply from: 'gradle/publish.gradle' spotbugs {
effort = "max"
reportLevel = "low"
//includeFilter = file("findbugs-exclude.xml")
}
tasks.withType(com.github.spotbugs.SpotBugsTask) {
ignoreFailures = true
reports {
xml.enabled = false
html.enabled = true
}
}
sonarqube {
properties {
property "sonar.projectName", "${project.group} ${project.name}"
property "sonar.sourceEncoding", "UTF-8"
property "sonar.tests", "src/test/java"
property "sonar.scm.provider", "git"
property "sonar.junit.reportsPath", "build/test-results/test/"
}
}
ext {
user = 'jprante'
name = 'netty-http-client'
description = 'A java client for Elasticsearch'
scmUrl = 'https://github.com/' + user + '/' + name
scmConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
scmDeveloperConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
}
task xbibUpload(type: Upload) {
group = 'publish'
configuration = configurations.archives
uploadDescriptor = true
repositories {
if (project.hasProperty("xbibUsername")) {
mavenDeployer {
configuration = configurations.wagon
repository(url: 'sftp://xbib.org/repository') {
authentication(userName: xbibUsername, privateKey: xbibPrivateKey)
}
}
}
}
}
task sonaTypeUpload(type: Upload) {
group = 'publish'
configuration = configurations.archives
uploadDescriptor = true
repositories {
if (project.hasProperty('ossrhUsername')) {
mavenDeployer {
beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') {
authentication(userName: ossrhUsername, password: ossrhPassword)
}
snapshotRepository(url: 'https://oss.sonatype.org/content/repositories/snapshots') {
authentication(userName: ossrhUsername, password: ossrhPassword)
}
pom.project {
groupId project.group
artifactId project.name
version project.version
name project.name
description description
packaging 'jar'
inceptionYear '2012'
url scmUrl
organization {
name 'xbib'
url 'http://xbib.org'
}
developers {
developer {
id user
name 'Jörg Prante'
email 'joergprante@gmail.com'
url 'https://github.com/jprante'
}
}
scm {
url scmUrl
connection scmConnection
developerConnection scmDeveloperConnection
}
licenses {
license {
name 'The Apache License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
}
}
}
}
}
nexusStaging {
packageGroup = "org.xbib"
}

View file

@ -1,8 +1,8 @@
group = org.xbib group = org.xbib
name = netty-http-client name = netty-http-client
version = 4.1.16.0 version = 4.1.22.2
netty.version = 4.1.16.Final netty.version = 4.1.22.Final
tcnative.version = 2.0.7.Final tcnative.version = 2.0.7.Final
conscrypt.version = 1.0.1 conscrypt.version = 1.0.1
xbib-net-url.version = 1.1.0 xbib-net-url.version = 1.1.0

View file

@ -1,9 +0,0 @@
ext {
user = 'jprante'
name = 'netty-http-client'
description = 'A java client for Elasticsearch'
scmUrl = 'https://github.com/' + user + '/' + name
scmConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
scmDeveloperConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
}

View file

@ -1,72 +0,0 @@
task xbibUpload(type: Upload) {
group = 'publish'
configuration = configurations.archives
uploadDescriptor = true
repositories {
if (project.hasProperty("xbibUsername")) {
mavenDeployer {
configuration = configurations.wagon
repository(url: 'sftp://xbib.org/repository') {
authentication(userName: xbibUsername, privateKey: xbibPrivateKey)
}
}
}
}
}
task sonaTypeUpload(type: Upload) {
group = 'publish'
configuration = configurations.archives
uploadDescriptor = true
repositories {
if (project.hasProperty('ossrhUsername')) {
mavenDeployer {
beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') {
authentication(userName: ossrhUsername, password: ossrhPassword)
}
snapshotRepository(url: 'https://oss.sonatype.org/content/repositories/snapshots') {
authentication(userName: ossrhUsername, password: ossrhPassword)
}
pom.project {
groupId project.group
artifactId project.name
version project.version
name project.name
description description
packaging 'jar'
inceptionYear '2012'
url scmUrl
organization {
name 'xbib'
url 'http://xbib.org'
}
developers {
developer {
id user
name 'Jörg Prante'
email 'joergprante@gmail.com'
url 'https://github.com/jprante'
}
}
scm {
url scmUrl
connection scmConnection
developerConnection scmDeveloperConnection
}
licenses {
license {
name 'The Apache License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
}
}
}
}
}
nexusStaging {
packageGroup = "org.xbib"
}

View file

@ -1,39 +0,0 @@
tasks.withType(FindBugs) {
ignoreFailures = true
reports {
xml.enabled = false
html.enabled = true
}
}
tasks.withType(Pmd) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
}
}
tasks.withType(Checkstyle) {
ignoreFailures = true
reports {
xml.enabled = true
html.enabled = true
}
}
jacocoTestReport {
reports {
xml.enabled = true
csv.enabled = false
}
}
sonarqube {
properties {
property "sonar.projectName", "${project.group} ${project.name}"
property "sonar.sourceEncoding", "UTF-8"
property "sonar.tests", "src/test/java"
property "sonar.scm.provider", "git"
property "sonar.java.coveragePlugin", "jacoco"
property "sonar.junit.reportsPath", "build/test-results/test/"
}
}

View file

@ -2,38 +2,54 @@ package org.xbib.netty.http.client;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import org.xbib.net.URL;
import org.xbib.netty.http.client.handler.http1.HttpChannelInitializer; import org.xbib.netty.http.client.handler.http1.HttpChannelInitializer;
import org.xbib.netty.http.client.handler.http1.HttpResponseHandler; import org.xbib.netty.http.client.handler.http1.HttpResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2ChannelInitializer; import org.xbib.netty.http.client.handler.http2.Http2ChannelInitializer;
import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler; import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2SettingsHandler; import org.xbib.netty.http.client.handler.http2.Http2SettingsHandler;
import org.xbib.netty.http.client.pool.Pool; import org.xbib.netty.http.client.pool.BoundedChannelPool;
import org.xbib.netty.http.client.pool.SimpleChannelPool;
import org.xbib.netty.http.client.transport.Http2Transport; import org.xbib.netty.http.client.transport.Http2Transport;
import org.xbib.netty.http.client.transport.HttpTransport; import org.xbib.netty.http.client.transport.HttpTransport;
import org.xbib.netty.http.client.transport.Transport; import org.xbib.netty.http.client.transport.Transport;
import org.xbib.netty.http.client.util.NetworkUtils; import org.xbib.netty.http.client.util.NetworkUtils;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import java.io.IOException; import java.io.IOException;
import java.security.KeyStoreException; import java.security.KeyStoreException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -45,7 +61,22 @@ public final class Client {
private static final ThreadFactory httpClientThreadFactory = new HttpClientThreadFactory(); private static final ThreadFactory httpClientThreadFactory = new HttpClientThreadFactory();
static { static {
NetworkUtils.extendSystemProperties(); if (System.getProperty("xbib.netty.http.client.extendsystemproperties") != null) {
NetworkUtils.extendSystemProperties();
}
// change Netty defaults to safer ones, but still allow override from arg line
if (System.getProperty("io.netty.noUnsafe") == null) {
System.setProperty("io.netty.noUnsafe", Boolean.toString(true));
}
if (System.getProperty("io.netty.noKeySetOptimization") == null) {
System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true));
}
if (System.getProperty("io.netty.recycler.maxCapacity") == null) {
System.setProperty("io.netty.recycler.maxCapacity", Integer.toString(0));
}
if (System.getProperty("io.netty.leakDetection.level") == null) {
System.setProperty("io.netty.leakDetection.level", "advanced");
}
} }
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
@ -68,7 +99,7 @@ public final class Client {
private TransportListener transportListener; private TransportListener transportListener;
private Pool<Channel> pool; private BoundedChannelPool<HttpAddress> pool;
public Client() { public Client() {
this(new ClientConfig()); this(new ClientConfig());
@ -84,40 +115,50 @@ public final class Client {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
initializeTrustManagerFactory(clientConfig); initializeTrustManagerFactory(clientConfig);
this.byteBufAllocator = byteBufAllocator != null ? this.byteBufAllocator = byteBufAllocator != null ?
byteBufAllocator : PooledByteBufAllocator.DEFAULT; byteBufAllocator : ByteBufAllocator.DEFAULT;
this.eventLoopGroup = eventLoopGroup != null ? this.eventLoopGroup = eventLoopGroup != null ? eventLoopGroup : clientConfig.isEpoll() ?
eventLoopGroup : new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory); new EpollEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory) :
this.socketChannelClass = socketChannelClass != null ? new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory);
socketChannelClass : NioSocketChannel.class; this.socketChannelClass = socketChannelClass != null ? socketChannelClass : clientConfig.isEpoll() ?
EpollSocketChannel.class : NioSocketChannel.class;
this.bootstrap = new Bootstrap() this.bootstrap = new Bootstrap()
.group(this.eventLoopGroup) .group(this.eventLoopGroup)
.channel(this.socketChannelClass) .channel(this.socketChannelClass)
//.option(ChannelOption.ALLOCATOR, byteBufAllocator)
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNodelay()) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNodelay())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isKeepAlive()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isKeepAlive())
.option(ChannelOption.SO_REUSEADDR, clientConfig.isReuseAddr()) .option(ChannelOption.SO_REUSEADDR, clientConfig.isReuseAddr())
.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSendBufferSize()) .option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpReceiveBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getTcpReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.option(ChannelOption.ALLOCATOR, byteBufAllocator); .option(ChannelOption.WRITE_BUFFER_WATER_MARK, clientConfig.getWriteBufferWaterMark());
this.httpResponseHandler = new HttpResponseHandler(); this.httpResponseHandler = new HttpResponseHandler();
this.http2SettingsHandler = new Http2SettingsHandler(); this.http2SettingsHandler = new Http2SettingsHandler();
this.http2ResponseHandler = new Http2ResponseHandler(); this.http2ResponseHandler = new Http2ResponseHandler();
this.transports = new CopyOnWriteArrayList<>(); this.transports = new CopyOnWriteArrayList<>();
List<HttpAddress> nodes = clientConfig.getNodes(); if (hasPooledConnections()) {
if (nodes != null && !nodes.isEmpty()) { List<HttpAddress> nodes = clientConfig.getPoolNodes();
Integer limit = clientConfig.getNodeConnectionLimit(); Integer limit = clientConfig.getPoolNodeConnectionLimit();
if (limit == null || limit > nodes.size()) { if (limit == null || limit < 1) {
limit = nodes.size();
}
if (limit < 1) {
limit = 1; limit = 1;
} }
Semaphore semaphore = new Semaphore(limit); Semaphore semaphore = new Semaphore(limit);
Integer retries = clientConfig.getRetriesPerNode(); Integer retries = clientConfig.getRetriesPerPoolNode();
if (retries == null || retries < 0) { if (retries == null || retries < 0) {
retries = 0; retries = 0;
} }
this.pool = new SimpleChannelPool<>(semaphore, nodes, bootstrap, null, retries); ClientChannelPoolHandler clientChannelPoolHandler = new ClientChannelPoolHandler();
this.pool = new BoundedChannelPool<>(semaphore, clientConfig.getPoolVersion(),
clientConfig.isPoolSecure(), nodes, bootstrap, clientChannelPoolHandler, retries);
Integer nodeConnectionLimit = clientConfig.getPoolNodeConnectionLimit();
if (nodeConnectionLimit == null || nodeConnectionLimit == 0) {
nodeConnectionLimit = nodes.size();
}
try {
this.pool.prepare(nodeConnectionLimit);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
} }
} }
@ -133,52 +174,115 @@ public final class Client {
return byteBufAllocator; return byteBufAllocator;
} }
public EventLoopGroup getEventLoopGroup() {
return eventLoopGroup;
}
public void setTransportListener(TransportListener transportListener) { public void setTransportListener(TransportListener transportListener) {
this.transportListener = transportListener; this.transportListener = transportListener;
} }
public boolean hasPooledConnections() {
return !clientConfig.getPoolNodes().isEmpty();
}
public BoundedChannelPool<HttpAddress> getPool() {
return pool;
}
public void logDiagnostics(Level level) { public void logDiagnostics(Level level) {
logger.log(level, () -> "OpenSSL available: " + OpenSsl.isAvailable() + logger.log(level, () -> "OpenSSL available: " + OpenSsl.isAvailable() +
" OpenSSL ALPN support: " + OpenSsl.isAlpnSupported() + " OpenSSL ALPN support: " + OpenSsl.isAlpnSupported() +
" Local host name: " + NetworkUtils.getLocalHostName("localhost")); " Local host name: " + NetworkUtils.getLocalHostName("localhost") +
" event loop group: " + eventLoopGroup +
" socket: " + socketChannelClass.getName() +
" allocator: " + byteBufAllocator.getClass().getName());
logger.log(level, NetworkUtils::displayNetworkInterfaces); logger.log(level, NetworkUtils::displayNetworkInterfaces);
} }
public int getTimeout() { public Transport newTransport() {
return clientConfig.getReadTimeoutMillis(); return newTransport(null);
}
public Transport newTransport(URL url, HttpVersion httpVersion) {
return newTransport(HttpAddress.of(url, httpVersion));
} }
public Transport newTransport(HttpAddress httpAddress) { public Transport newTransport(HttpAddress httpAddress) {
Transport transport; Transport transport = null;
if (httpAddress.getVersion().majorVersion() < 2) { if (httpAddress != null) {
transport = new HttpTransport(this, httpAddress); if (httpAddress.getVersion().majorVersion() == 1) {
} else { transport = new HttpTransport(this, httpAddress);
transport = new Http2Transport(this, httpAddress); } else {
transport = new Http2Transport(this, httpAddress);
}
} else if (hasPooledConnections()) {
if (pool.getVersion().majorVersion() == 1) {
transport = new HttpTransport(this, null);
} else {
transport = new Http2Transport(this, null);
}
} }
if (transportListener != null) { if (transport != null) {
transportListener.onOpen(transport); if (transportListener != null) {
transportListener.onOpen(transport);
}
transports.add(transport);
} }
transports.add(transport);
return transport; return transport;
} }
public Channel newChannel(HttpAddress httpAddress) throws InterruptedException { public Channel newChannel(HttpAddress httpAddress) throws IOException {
HttpVersion httpVersion = httpAddress.getVersion(); Channel channel = null;
ChannelInitializer<SocketChannel> initializer; if (httpAddress != null) {
Channel channel; HttpVersion httpVersion = httpAddress.getVersion();
if (httpVersion.majorVersion() < 2) { ChannelInitializer<SocketChannel> initializer;
initializer = new HttpChannelInitializer(clientConfig, httpAddress, httpResponseHandler); SslHandler sslHandler = newSslHandler(clientConfig, byteBufAllocator, httpAddress);
channel = bootstrap.handler(initializer) if (httpVersion.majorVersion() == 1) {
.connect(httpAddress.getInetSocketAddress()).sync().await().channel(); initializer = new HttpChannelInitializer(clientConfig, httpAddress,
sslHandler, httpResponseHandler);
} else {
initializer = new Http2ChannelInitializer(clientConfig, httpAddress,
sslHandler, http2SettingsHandler, http2ResponseHandler);
}
try {
channel = bootstrap.handler(initializer)
.connect(httpAddress.getInetSocketAddress()).sync().await().channel();
} catch (InterruptedException e) {
throw new IOException(e);
}
} else { } else {
initializer = new Http2ChannelInitializer(clientConfig, httpAddress, if (hasPooledConnections()) {
http2SettingsHandler, http2ResponseHandler); try {
channel = bootstrap.handler(initializer) channel = pool.acquire();
.connect(httpAddress.getInetSocketAddress()).sync().await().channel(); } catch (Exception e) {
throw new IOException(e);
}
} else {
throw new UnsupportedOperationException();
}
} }
return channel; return channel;
} }
public Channel newChannel() throws IOException {
return newChannel(null);
}
public void releaseChannel(Channel channel) throws IOException{
if (hasPooledConnections()) {
try {
pool.release(channel);
} catch (Exception e) {
throw new IOException(e);
}
} else {
if (channel != null) {
channel.close();
}
}
}
public Transport execute(Request request) throws IOException { public Transport execute(Request request) throws IOException {
Transport transport = newTransport(HttpAddress.of(request)); Transport transport = newTransport(HttpAddress.of(request));
transport.execute(request); transport.execute(request);
@ -190,8 +294,14 @@ public final class Client {
return newTransport(HttpAddress.of(request)).execute(request, supplier); return newTransport(HttpAddress.of(request)).execute(request, supplier);
} }
public Transport pooledExecute(Request request) throws IOException {
Transport transport = newTransport();
transport.execute(request);
return transport;
}
/** /**
* For following redirects by a chain of transports. * For following redirects, construct a new transport.
* @param transport the previous transport * @param transport the previous transport
* @param request the new request for continuing the request. * @param request the new request for continuing the request.
*/ */
@ -203,6 +313,13 @@ public final class Client {
close(nextTransport); close(nextTransport);
} }
/**
* Retry request by following a back-off strategy.
*
* @param transport the transport to retry
* @param request the request to retry
* @throws IOException if retry failed
*/
public void retry(Transport transport, Request request) throws IOException { public void retry(Transport transport, Request request) throws IOException {
transport.execute(request); transport.execute(request);
transport.get(); transport.get();
@ -213,7 +330,7 @@ public final class Client {
return newTransport(HttpAddress.of(request)); return newTransport(HttpAddress.of(request));
} }
public void close(Transport transport) { public void close(Transport transport) throws IOException {
if (transportListener != null) { if (transportListener != null) {
transportListener.onClose(transport); transportListener.onClose(transport);
} }
@ -221,21 +338,29 @@ public final class Client {
transports.remove(transport); transports.remove(transport);
} }
public void close() { public void close() throws IOException {
for (Transport transport : transports) { for (Transport transport : transports) {
close(transport); close(transport);
} }
} }
public void shutdown() { public void shutdownGracefully() throws IOException {
eventLoopGroup.shutdownGracefully(); if (hasPooledConnections()) {
} pool.close();
}
public void shutdownGracefully() {
close(); close();
shutdown(); shutdown();
} }
public void shutdown() {
eventLoopGroup.shutdownGracefully();
try {
eventLoopGroup.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
}
/** /**
* Initialize trust manager factory once per client lifecycle. * Initialize trust manager factory once per client lifecycle.
* @param clientConfig the client config * @param clientConfig the client config
@ -251,6 +376,60 @@ public final class Client {
} }
} }
private static SslHandler newSslHandler(ClientConfig clientConfig, ByteBufAllocator allocator, HttpAddress httpAddress) {
try {
SslContext sslContext = newSslContext(clientConfig);
SslHandler sslHandler = sslContext.newHandler(allocator);
SSLEngine engine = sslHandler.engine();
List<String> serverNames = clientConfig.getServerNamesForIdentification();
if (serverNames.isEmpty()) {
serverNames = Collections.singletonList(httpAddress.getInetSocketAddress().getHostName());
}
SSLParameters params = engine.getSSLParameters();
params.setEndpointIdentificationAlgorithm("HTTPS");
List<SNIServerName> sniServerNames = new ArrayList<>();
for (String serverName : serverNames) {
sniServerNames.add(new SNIHostName(serverName));
}
params.setServerNames(sniServerNames);
engine.setSSLParameters(params);
switch (clientConfig.getClientAuthMode()) {
case NEED:
engine.setNeedClientAuth(true);
break;
case WANT:
engine.setWantClientAuth(true);
break;
default:
break;
}
return sslHandler;
} catch (SSLException e) {
throw new IllegalArgumentException(e);
}
}
private static SslContext newSslContext(ClientConfig clientConfig) throws SSLException {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.sslProvider(clientConfig.getSslProvider())
.ciphers(Http2SecurityUtil.CIPHERS, clientConfig.getCipherSuiteFilter())
.applicationProtocolConfig(newApplicationProtocolConfig());
if (clientConfig.getSslContextProvider() != null) {
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
}
if (clientConfig.getTrustManagerFactory() != null) {
sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
}
return sslContextBuilder.build();
}
private static ApplicationProtocolConfig newApplicationProtocolConfig() {
return new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2);
}
public interface TransportListener { public interface TransportListener {
void onOpen(Transport transport); void onOpen(Transport transport);
@ -269,4 +448,35 @@ public final class Client {
return thread; return thread;
} }
} }
class ClientChannelPoolHandler implements ChannelPoolHandler {
@Override
public void channelReleased(Channel channel) {
}
@Override
public void channelAcquired(Channel channel) {
}
@Override
public void channelCreated(Channel channel) {
HttpAddress httpAddress = channel.attr(pool.getAttributeKey()).get();
HttpVersion httpVersion = httpAddress.getVersion();
SslHandler sslHandler = newSslHandler(clientConfig, byteBufAllocator, httpAddress);
if (httpVersion.majorVersion() == 1) {
HttpChannelInitializer initializer = new HttpChannelInitializer(clientConfig, httpAddress,
sslHandler, httpResponseHandler);
if (channel instanceof SocketChannel) {
initializer.initChannel((SocketChannel) channel);
}
} else {
Http2ChannelInitializer initializer = new Http2ChannelInitializer(clientConfig, httpAddress,
sslHandler, http2SettingsHandler, http2ResponseHandler);
if (channel instanceof SocketChannel) {
initializer.initChannel((SocketChannel) channel);
}
}
}
}
} }

View file

@ -2,10 +2,14 @@ package org.xbib.netty.http.client;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream; import java.io.InputStream;
@ -155,11 +159,6 @@ public class ClientBuilder {
return this; return this;
} }
public ClientBuilder setTrustManagerFactory(TrustManagerFactory trustManagerFactory) {
clientConfig.setTrustManagerFactory(trustManagerFactory);
return this;
}
public ClientBuilder setKeyCert(InputStream keyCertChainInputStream, InputStream keyInputStream) { public ClientBuilder setKeyCert(InputStream keyCertChainInputStream, InputStream keyInputStream) {
clientConfig.setKeyCert(keyCertChainInputStream, keyInputStream); clientConfig.setKeyCert(keyCertChainInputStream, keyInputStream);
return this; return this;
@ -171,8 +170,13 @@ public class ClientBuilder {
return this; return this;
} }
public ClientBuilder setServerNameIdentification(boolean serverNameIdentification) { public ClientBuilder setTrustManagerFactory(TrustManagerFactory trustManagerFactory) {
clientConfig.setServerNameIdentification(serverNameIdentification); clientConfig.setTrustManagerFactory(trustManagerFactory);
return this;
}
public ClientBuilder trustInsecure() {
clientConfig.setTrustManagerFactory(InsecureTrustManagerFactory.INSTANCE);
return this; return this;
} }
@ -186,6 +190,46 @@ public class ClientBuilder {
return this; return this;
} }
public ClientBuilder addPoolNode(HttpAddress httpAddress) {
clientConfig.addPoolNode(httpAddress);
return this;
}
public ClientBuilder setPoolNodeConnectionLimit(int nodeConnectionLimit) {
clientConfig.setPoolNodeConnectionLimit(nodeConnectionLimit);
return this;
}
public ClientBuilder setRetriesPerPoolNode(int retriesPerNode) {
clientConfig.setRetriesPerPoolNode(retriesPerNode);
return this;
}
public ClientBuilder setPoolVersion(HttpVersion poolVersion) {
clientConfig.setPoolVersion(poolVersion);
return this;
}
public ClientBuilder setPoolSecure(boolean poolSecure) {
clientConfig.setPoolSecure(poolSecure);
return this;
}
public ClientBuilder addServerNameForIdentification(String serverName) {
clientConfig.addServerNameForIdentification(serverName);
return this;
}
public ClientBuilder setHttp2Settings(Http2Settings http2Settings) {
clientConfig.setHttp2Settings(http2Settings);
return this;
}
public ClientBuilder setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
clientConfig.setWriteBufferWaterMark(writeBufferWaterMark);
return this;
}
public Client build() { public Client build() {
return new Client(clientConfig, byteBufAllocator, eventLoopGroup, socketChannelClass); return new Client(clientConfig, byteBufAllocator, eventLoopGroup, socketChannelClass);
} }

View file

@ -1,15 +1,22 @@
package org.xbib.netty.http.client; package org.xbib.netty.http.client;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2SecurityUtil; import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter; import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import org.xbib.netty.http.client.retry.BackOff;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream; import java.io.InputStream;
import java.security.KeyStore; import java.security.KeyStore;
import java.security.Provider; import java.security.Provider;
import java.util.ArrayList;
import java.util.List; import java.util.List;
public class ClientConfig { public class ClientConfig {
@ -22,7 +29,18 @@ public class ClientConfig {
boolean DEBUG = false; boolean DEBUG = false;
/** /**
* Default for thread count. * Default debug log level.
*/
LogLevel DEFAULT_DEBUG_LOG_LEVEL = LogLevel.DEBUG;
/**
* The default for selecting epoll. If available, select epoll.
*/
boolean EPOLL = Epoll.isAvailable();
/**
* If set to 0, then Netty will decide about thread count.
* Default is Runtime.getRuntime().availableProcessors() * 2
*/ */
int THREAD_COUNT = 0; int THREAD_COUNT = 0;
@ -110,12 +128,40 @@ public class ClientConfig {
*/ */
CipherSuiteFilter CIPHER_SUITE_FILTER = SupportedCipherSuiteFilter.INSTANCE; CipherSuiteFilter CIPHER_SUITE_FILTER = SupportedCipherSuiteFilter.INSTANCE;
boolean USE_SERVER_NAME_IDENTIFICATION = true;
/** /**
* Default for SSL client authentication. * Default for SSL client authentication.
*/ */
ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE; ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE;
/**
* Default for pool retries per node.
*/
Integer RETRIES_PER_NODE = 0;
/**
* Default pool HTTP version.
*/
HttpVersion POOL_VERSION = HttpVersion.HTTP_1_1;
/**
* Default connection pool security.
*/
Boolean POOL_SECURE = false;
/**
* Default HTTP/2 settings.
*/
Http2Settings HTTP2_SETTINGS = Http2Settings.defaultSettings();
/**
* Default write buffer water mark.
*/
WriteBufferWaterMark WRITE_BUFFER_WATER_MARK = WriteBufferWaterMark.DEFAULT;
/**
* Default for backoff.
*/
BackOff BACK_OFF = BackOff.ZERO_BACKOFF;
} }
private static TrustManagerFactory TRUST_MANAGER_FACTORY; private static TrustManagerFactory TRUST_MANAGER_FACTORY;
@ -123,10 +169,6 @@ public class ClientConfig {
static { static {
try { try {
TRUST_MANAGER_FACTORY = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); TRUST_MANAGER_FACTORY = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
//InsecureTrustManagerFactory.INSTANCE;
//TRUST_MANAGER_FACTORY.init((KeyStore) null);
// java.lang.IllegalStateException: TrustManagerFactoryImpl is not initialized
//TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
} catch (Exception e) { } catch (Exception e) {
TRUST_MANAGER_FACTORY = null; TRUST_MANAGER_FACTORY = null;
} }
@ -134,10 +176,10 @@ public class ClientConfig {
private boolean debug = Defaults.DEBUG; private boolean debug = Defaults.DEBUG;
/** private LogLevel debugLogLevel = Defaults.DEFAULT_DEBUG_LOG_LEVEL;
* If set to 0, then Netty will decide about thread count.
* Default is Runtime.getRuntime().availableProcessors() * 2 private boolean epoll = Defaults.EPOLL;
*/
private int threadCount = Defaults.THREAD_COUNT; private int threadCount = Defaults.THREAD_COUNT;
private boolean tcpNodelay = Defaults.TCP_NODELAY; private boolean tcpNodelay = Defaults.TCP_NODELAY;
@ -178,8 +220,6 @@ public class ClientConfig {
private KeyStore trustManagerKeyStore = null; private KeyStore trustManagerKeyStore = null;
private boolean serverNameIdentification = Defaults.USE_SERVER_NAME_IDENTIFICATION;
private ClientAuthMode clientAuthMode = Defaults.SSL_CLIENT_AUTH_MODE; private ClientAuthMode clientAuthMode = Defaults.SSL_CLIENT_AUTH_MODE;
private InputStream keyCertChainInputStream; private InputStream keyCertChainInputStream;
@ -190,11 +230,23 @@ public class ClientConfig {
private HttpProxyHandler httpProxyHandler; private HttpProxyHandler httpProxyHandler;
private List<HttpAddress> nodes; private List<HttpAddress> poolNodes = new ArrayList<>();
private Integer nodeConnectionLimit; private Integer poolNodeConnectionLimit;
private Integer retriesPerNode; private Integer retriesPerPoolNode = Defaults.RETRIES_PER_NODE;
private HttpVersion poolVersion = Defaults.POOL_VERSION;
private Boolean poolSecure = Defaults.POOL_SECURE;
private List<String> serverNamesForIdentification = new ArrayList<>();
private Http2Settings http2Settings = Defaults.HTTP2_SETTINGS;
private WriteBufferWaterMark writeBufferWaterMark = Defaults.WRITE_BUFFER_WATER_MARK;
private BackOff backOff = Defaults.BACK_OFF;
public ClientConfig setDebug(boolean debug) { public ClientConfig setDebug(boolean debug) {
this.debug = debug; this.debug = debug;
@ -215,6 +267,29 @@ public class ClientConfig {
return debug; return debug;
} }
public ClientConfig setDebugLogLevel(LogLevel debugLogLevel) {
this.debugLogLevel = debugLogLevel;
return this;
}
public LogLevel getDebugLogLevel() {
return debugLogLevel;
}
public ClientConfig enableEpoll() {
this.epoll = true;
return this;
}
public ClientConfig disableEpoll() {
this.epoll = false;
return this;
}
public boolean isEpoll() {
return epoll;
}
public ClientConfig setThreadCount(int threadCount) { public ClientConfig setThreadCount(int threadCount) {
this.threadCount = threadCount; this.threadCount = threadCount;
return this; return this;
@ -341,6 +416,15 @@ public class ClientConfig {
return enableGzip; return enableGzip;
} }
public ClientConfig setHttp2Settings(Http2Settings http2Settings) {
this.http2Settings = http2Settings;
return this;
}
public Http2Settings getHttp2Settings() {
return http2Settings;
}
public ClientConfig setSslProvider(SslProvider sslProvider) { public ClientConfig setSslProvider(SslProvider sslProvider) {
this.sslProvider = sslProvider; this.sslProvider = sslProvider;
return this; return this;
@ -413,15 +497,6 @@ public class ClientConfig {
return keyPassword; return keyPassword;
} }
public ClientConfig setServerNameIdentification(boolean serverNameIdentification) {
this.serverNameIdentification = serverNameIdentification;
return this;
}
public boolean isServerNameIdentification() {
return serverNameIdentification;
}
public ClientConfig setClientAuthMode(ClientAuthMode clientAuthMode) { public ClientConfig setClientAuthMode(ClientAuthMode clientAuthMode) {
this.clientAuthMode = clientAuthMode; this.clientAuthMode = clientAuthMode;
return this; return this;
@ -458,31 +533,81 @@ public class ClientConfig {
return httpProxyHandler; return httpProxyHandler;
} }
public ClientConfig setNodes(List<HttpAddress> nodes) { public ClientConfig setPoolNodes(List<HttpAddress> poolNodes) {
this.nodes = nodes; this.poolNodes = poolNodes;
return this; return this;
} }
public List<HttpAddress> getNodes() { public List<HttpAddress> getPoolNodes() {
return nodes; return poolNodes;
} }
public ClientConfig setNodeConnectionLimit(Integer nodeConnectionLimit) { public ClientConfig addPoolNode(HttpAddress poolNodeAddress) {
this.nodeConnectionLimit = nodeConnectionLimit; this.poolNodes.add(poolNodeAddress);
return this; return this;
} }
public Integer getNodeConnectionLimit() { public ClientConfig setPoolNodeConnectionLimit(Integer poolNodeConnectionLimit) {
return nodeConnectionLimit; this.poolNodeConnectionLimit = poolNodeConnectionLimit;
}
public ClientConfig setRetriesPerNode(Integer retriesPerNode) {
this.retriesPerNode = retriesPerNode;
return this; return this;
} }
public Integer getRetriesPerNode() { public Integer getPoolNodeConnectionLimit() {
return retriesPerNode; return poolNodeConnectionLimit;
}
public ClientConfig setRetriesPerPoolNode(Integer retriesPerPoolNode) {
this.retriesPerPoolNode = retriesPerPoolNode;
return this;
}
public Integer getRetriesPerPoolNode() {
return retriesPerPoolNode;
}
public ClientConfig setPoolVersion(HttpVersion poolVersion) {
this.poolVersion = poolVersion;
return this;
}
public HttpVersion getPoolVersion() {
return poolVersion;
}
public ClientConfig setPoolSecure(boolean poolSecure) {
this.poolSecure = poolSecure;
return this;
}
public boolean isPoolSecure() {
return poolSecure;
}
public ClientConfig addServerNameForIdentification(String serverNameForIdentification) {
this.serverNamesForIdentification.add(serverNameForIdentification);
return this;
}
public List<String> getServerNamesForIdentification() {
return serverNamesForIdentification;
}
public ClientConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
this.writeBufferWaterMark = writeBufferWaterMark;
return this;
}
public WriteBufferWaterMark getWriteBufferWaterMark() {
return writeBufferWaterMark;
}
public ClientConfig setBackOff(BackOff backOff) {
this.backOff = backOff;
return this;
}
public BackOff getBackOff() {
return backOff;
} }
@Override @Override
@ -491,7 +616,5 @@ public class ClientConfig {
sb.append("SSL=").append(sslProvider) sb.append("SSL=").append(sslProvider)
.append(",SSL context provider=").append(sslContextProvider != null ? sslContextProvider.getName() : "<none>"); .append(",SSL context provider=").append(sslContextProvider != null ? sslContextProvider.getName() : "<none>");
return sb.toString(); return sb.toString();
} }
} }

View file

@ -10,14 +10,18 @@ import org.xbib.net.URL;
import org.xbib.netty.http.client.listener.CookieListener; import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener; import org.xbib.netty.http.client.listener.HttpResponseListener;
import org.xbib.netty.http.client.retry.BackOff;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/** /**
* * HTTP client request.
*/ */
public class Request { public class Request implements Closeable {
private final URL base; private final URL base;
@ -33,7 +37,7 @@ public class Request {
private final ByteBuf content; private final ByteBuf content;
private final int timeout; private final long timeoutInMillis;
private final boolean followRedirect; private final boolean followRedirect;
@ -41,6 +45,12 @@ public class Request {
private int redirectCount; private int redirectCount;
private final boolean isBackOff;
private final BackOff backOff;
private CompletableFuture<?> completableFuture;
private HttpResponseListener responseListener; private HttpResponseListener responseListener;
private HttpHeadersListener headersListener; private HttpHeadersListener headersListener;
@ -50,7 +60,8 @@ public class Request {
Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod, Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod,
HttpHeaders headers, Collection<Cookie> cookies, HttpHeaders headers, Collection<Cookie> cookies,
String uri, ByteBuf content, String uri, ByteBuf content,
int timeout, boolean followRedirect, int maxRedirect, int redirectCount) { long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount,
boolean isBackOff, BackOff backOff) {
this.base = url; this.base = url;
this.httpVersion = httpVersion; this.httpVersion = httpVersion;
this.httpMethod = httpMethod; this.httpMethod = httpMethod;
@ -58,10 +69,12 @@ public class Request {
this.cookies = cookies; this.cookies = cookies;
this.uri = uri; this.uri = uri;
this.content = content; this.content = content;
this.timeout = timeout; this.timeoutInMillis = timeoutInMillis;
this.followRedirect = followRedirect; this.followRedirect = followRedirect;
this.maxRedirects = maxRedirect; this.maxRedirects = maxRedirect;
this.redirectCount = redirectCount; this.redirectCount = redirectCount;
this.isBackOff = isBackOff;
this.backOff = backOff;
} }
public URL base() { public URL base() {
@ -92,15 +105,27 @@ public class Request {
return content; return content;
} }
public int getTimeout() { /**
return timeout; * Return the timeout in milliseconds per request. This overrides the read timeout of the client.
* @return timeout timeout in milliseconds
*/
public long getTimeoutInMillis() {
return timeoutInMillis;
} }
public boolean isFollowRedirect() { public boolean isFollowRedirect() {
return followRedirect; return followRedirect;
} }
public boolean checkRedirect() { public boolean isBackOff() {
return isBackOff;
}
public BackOff getBackOff() {
return backOff;
}
public boolean canRedirect() {
if (!followRedirect) { if (!followRedirect) {
return false; return false;
} }
@ -124,6 +149,15 @@ public class Request {
return sb.toString(); return sb.toString();
} }
public Request setCompletableFuture(CompletableFuture<?> completableFuture) {
this.completableFuture = completableFuture;
return this;
}
public CompletableFuture<?> getCompletableFuture() {
return completableFuture;
}
public Request setHeadersListener(HttpHeadersListener httpHeadersListener) { public Request setHeadersListener(HttpHeadersListener httpHeadersListener) {
this.headersListener = httpHeadersListener; this.headersListener = httpHeadersListener;
return this; return this;
@ -190,4 +224,11 @@ public class Request {
public static RequestBuilder builder(HttpMethod httpMethod) { public static RequestBuilder builder(HttpMethod httpMethod) {
return new RequestBuilder().setMethod(httpMethod); return new RequestBuilder().setMethod(httpMethod);
} }
@Override
public void close() throws IOException {
if (content != null) {
content.release();
}
}
} }

View file

@ -16,6 +16,7 @@ import io.netty.util.AsciiString;
import org.xbib.net.QueryParameters; import org.xbib.net.QueryParameters;
import org.xbib.net.URL; import org.xbib.net.URL;
import org.xbib.net.URLSyntaxException; import org.xbib.net.URLSyntaxException;
import org.xbib.netty.http.client.retry.BackOff;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -44,7 +45,7 @@ public class RequestBuilder {
private static final boolean DEFAULT_FOLLOW_REDIRECT = true; private static final boolean DEFAULT_FOLLOW_REDIRECT = true;
private static final int DEFAULT_TIMEOUT_MILLIS = 5000; private static final long DEFAULT_TIMEOUT_MILLIS = -1L;
private static final int DEFAULT_MAX_REDIRECT = 10; private static final int DEFAULT_MAX_REDIRECT = 10;
@ -74,12 +75,16 @@ public class RequestBuilder {
private ByteBuf content; private ByteBuf content;
private int timeout; private long timeoutInMillis;
private boolean followRedirect; private boolean followRedirect;
private int maxRedirects; private int maxRedirects;
private boolean enableBackOff;
private BackOff backOff;
RequestBuilder() { RequestBuilder() {
httpMethod = DEFAULT_METHOD; httpMethod = DEFAULT_METHOD;
httpVersion = DEFAULT_HTTP_VERSION; httpVersion = DEFAULT_HTTP_VERSION;
@ -87,7 +92,7 @@ public class RequestBuilder {
gzip = DEFAULT_GZIP; gzip = DEFAULT_GZIP;
keepalive = DEFAULT_KEEPALIVE; keepalive = DEFAULT_KEEPALIVE;
url = DEFAULT_URL; url = DEFAULT_URL;
timeout = DEFAULT_TIMEOUT_MILLIS; timeoutInMillis = DEFAULT_TIMEOUT_MILLIS;
followRedirect = DEFAULT_FOLLOW_REDIRECT; followRedirect = DEFAULT_FOLLOW_REDIRECT;
maxRedirects = DEFAULT_MAX_REDIRECT; maxRedirects = DEFAULT_MAX_REDIRECT;
headers = new DefaultHttpHeaders(); headers = new DefaultHttpHeaders();
@ -121,8 +126,8 @@ public class RequestBuilder {
return this; return this;
} }
public RequestBuilder setTimeout(int timeout) { public RequestBuilder setTimeoutInMillis(long timeoutInMillis) {
this.timeout = timeout; this.timeoutInMillis = timeoutInMillis;
return this; return this;
} }
@ -207,6 +212,16 @@ public class RequestBuilder {
return this; return this;
} }
public RequestBuilder enableBackOff(boolean enableBackOff) {
this.enableBackOff = enableBackOff;
return this;
}
public RequestBuilder setBackOff(BackOff backOff) {
this.backOff = backOff;
return this;
}
public RequestBuilder setUserAgent(String userAgent) { public RequestBuilder setUserAgent(String userAgent) {
this.userAgent = userAgent; this.userAgent = userAgent;
return this; return this;
@ -255,14 +270,10 @@ public class RequestBuilder {
throw new IllegalStateException("host in URL not defined: " + url); throw new IllegalStateException("host in URL not defined: " + url);
} }
if (uri != null) { if (uri != null) {
if (this.url != null) { try {
try { url = URL.base(url).resolve(uri);
url = URL.base(url).resolve(uri); } catch (URLSyntaxException e) {
} catch (URLSyntaxException e) { throw new IllegalArgumentException(e);
throw new IllegalArgumentException(e);
}
} else {
url(uri);
} }
} }
// add explicit parameters to URL // add explicit parameters to URL
@ -320,7 +331,7 @@ public class RequestBuilder {
validatedHeaders.remove(headerName); validatedHeaders.remove(headerName);
} }
return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, uri, content, return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, uri, content,
timeout, followRedirect, maxRedirects, 0); timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff);
} }
private void addHeader(AsciiString name, Object value) { private void addHeader(AsciiString name, Object value) {

View file

@ -1,30 +0,0 @@
package org.xbib.netty.http.client.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A Netty handler that logs user events and find expetced ones.
*/
@ChannelHandler.Sharable
class UserEventLogger extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(UserEventLogger.class.getName());
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.log(Level.FINE, () -> "got user event " + evt);
if (evt instanceof SslCloseCompletionEvent ||
evt instanceof ChannelInputShutdownReadComplete) {
logger.log(Level.FINE, () -> "user event is expected: " + evt);
return;
}
super.userEventTriggered(ctx, evt);
}
}

View file

@ -6,87 +6,59 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.SslContext; import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.ClientConfig;
import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.handler.TrafficLoggingHandler;
import javax.net.ssl.SNIHostName; import java.util.logging.Level;
import javax.net.ssl.SSLEngine; import java.util.logging.Logger;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import java.util.Collections;
public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> { public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
private static final Logger logger = Logger.getLogger(HttpChannelInitializer.class.getName());
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
private final HttpAddress httpAddress; private final HttpAddress httpAddress;
private final SslHandler sslHandler;
private final HttpResponseHandler httpResponseHandler; private final HttpResponseHandler httpResponseHandler;
public HttpChannelInitializer(ClientConfig clientConfig, HttpAddress httpAddress, HttpResponseHandler httpResponseHandler) { public HttpChannelInitializer(ClientConfig clientConfig,
HttpAddress httpAddress,
SslHandler sslHandler,
HttpResponseHandler httpResponseHandler) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.httpAddress = httpAddress; this.httpAddress = httpAddress;
this.sslHandler = sslHandler;
this.httpResponseHandler = httpResponseHandler; this.httpResponseHandler = httpResponseHandler;
} }
@Override @Override
protected void initChannel(SocketChannel ch) { public void initChannel(SocketChannel channel) {
if (clientConfig.isDebug()) { if (clientConfig.isDebug()) {
ch.pipeline().addLast(new TrafficLoggingHandler()); channel.pipeline().addLast(new TrafficLoggingHandler(LogLevel.DEBUG));
} }
if (httpAddress.isSecure()) { if (httpAddress.isSecure()) {
configureEncryptedHttp1(ch); configureEncrypted(channel);
} else { } else {
configureCleartextHttp1(ch); configureCleartext(channel);
}
if (clientConfig.isDebug()) {
logger.log(Level.FINE, "HTTP 1 channel initialized: " + channel.pipeline().names());
} }
} }
private void configureEncryptedHttp1(SocketChannel ch) { private void configureEncrypted(SocketChannel channel) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = channel.pipeline();
try { pipeline.addLast(sslHandler);
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() configureCleartext(channel);
.sslProvider(clientConfig.getSslProvider())
.keyManager(clientConfig.getKeyCertChainInputStream(), clientConfig.getKeyInputStream(),
clientConfig.getKeyPassword())
.ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter());
if (clientConfig.getSslContextProvider() != null) {
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
}
if (clientConfig.getTrustManagerFactory() != null) {
sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
}
SslContext sslContext = sslContextBuilder.build();
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();
if (clientConfig.isServerNameIdentification()) {
String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName();
SSLParameters params = engine.getSSLParameters();
params.setServerNames(Collections.singletonList(new SNIHostName(fullQualifiedHostname)));
engine.setSSLParameters(params);
}
pipeline.addLast(sslHandler);
switch (clientConfig.getClientAuthMode()) {
case NEED:
engine.setNeedClientAuth(true);
break;
case WANT:
engine.setWantClientAuth(true);
break;
default:
break;
}
} catch (SSLException e) {
throw new IllegalStateException("unable to configure SSL: " + e.getMessage(), e);
}
configureCleartextHttp1(ch);
} }
private void configureCleartextHttp1(SocketChannel ch) { private void configureCleartext(SocketChannel channel) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpClientCodec(clientConfig.getMaxInitialLineLength(), pipeline.addLast(new HttpClientCodec(clientConfig.getMaxInitialLineLength(),
clientConfig.getMaxHeadersSize(), clientConfig.getMaxChunkSize())); clientConfig.getMaxHeadersSize(), clientConfig.getMaxChunkSize()));
if (clientConfig.isEnableGzip()) { if (clientConfig.isEnableGzip()) {

View file

@ -1,4 +1,4 @@
package org.xbib.netty.http.client.handler; package org.xbib.netty.http.client.handler.http1;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -13,8 +13,8 @@ import io.netty.handler.logging.LoggingHandler;
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class TrafficLoggingHandler extends LoggingHandler { public class TrafficLoggingHandler extends LoggingHandler {
public TrafficLoggingHandler() { public TrafficLoggingHandler(LogLevel level) {
super("client", LogLevel.TRACE); super("client", level);
} }
@Override @Override

View file

@ -7,25 +7,14 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder; import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder; import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.ClientConfig;
import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.HttpAddress;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import java.util.Collections;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -37,30 +26,69 @@ public class Http2ChannelInitializer extends ChannelInitializer<SocketChannel> {
private final HttpAddress httpAddress; private final HttpAddress httpAddress;
private final SslHandler sslHandler;
private final Http2SettingsHandler http2SettingsHandler; private final Http2SettingsHandler http2SettingsHandler;
private final Http2ResponseHandler http2ResponseHandler; private final Http2ResponseHandler http2ResponseHandler;
public Http2ChannelInitializer(ClientConfig clientConfig, public Http2ChannelInitializer(ClientConfig clientConfig,
HttpAddress httpAddress, HttpAddress httpAddress,
SslHandler sslHandler,
Http2SettingsHandler http2SettingsHandler, Http2SettingsHandler http2SettingsHandler,
Http2ResponseHandler http2ResponseHandler) { Http2ResponseHandler http2ResponseHandler) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.httpAddress = httpAddress; this.httpAddress = httpAddress;
this.sslHandler = sslHandler;
this.http2SettingsHandler = http2SettingsHandler; this.http2SettingsHandler = http2SettingsHandler;
this.http2ResponseHandler = http2ResponseHandler; this.http2ResponseHandler = http2ResponseHandler;
} }
/** /**
* The channel initialization for HTTP/2 is always encrypted. * The channel initialization for HTTP/2.
* The reason is there is no known HTTP/2 server supporting cleartext.
* *
* @param ch socket channel * @param channel socket channel
*/ */
@Override @Override
protected void initChannel(SocketChannel ch) { public void initChannel(SocketChannel channel) {
if (httpAddress.isSecure()) {
configureEncrypted(channel);
} else {
configureCleartext(channel);
}
if (clientConfig.isDebug()) {
logger.log(Level.FINE, "HTTP/2 channel initialized: " + channel.pipeline().names());
}
}
private void configureEncrypted(SocketChannel channel) {
channel.pipeline().addLast(sslHandler);
ApplicationProtocolNegotiationHandler negotiationHandler = new ApplicationProtocolNegotiationHandler("") {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(newConnectionHandler(), http2SettingsHandler, http2ResponseHandler);
if (clientConfig.isDebug()) {
logger.log(Level.FINE, "after negotiation: " + ctx.pipeline().names());
}
return;
}
// we do not fall back to HTTP1
ctx.close();
throw new IllegalStateException("protocol not accepted: " + protocol);
}
};
channel.pipeline().addLast(negotiationHandler);
}
private void configureCleartext(SocketChannel ch) {
ch.pipeline().addLast(newConnectionHandler(), http2SettingsHandler, http2ResponseHandler);
}
private Http2ConnectionHandler newConnectionHandler() {
Http2Connection http2Connection = new DefaultHttp2Connection(false); Http2Connection http2Connection = new DefaultHttp2Connection(false);
HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder() HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder()
.initialSettings(clientConfig.getHttp2Settings())
.connection(http2Connection) .connection(http2Connection)
.frameListener(new Http2PushPromiseHandler(http2Connection, .frameListener(new Http2PushPromiseHandler(http2Connection,
new InboundHttp2ToHttpAdapterBuilder(http2Connection) new InboundHttp2ToHttpAdapterBuilder(http2Connection)
@ -68,48 +96,8 @@ public class Http2ChannelInitializer extends ChannelInitializer<SocketChannel> {
.propagateSettings(true) .propagateSettings(true)
.build())); .build()));
if (clientConfig.isDebug()) { if (clientConfig.isDebug()) {
http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client")); http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(clientConfig.getDebugLogLevel(), "client"));
}
Http2ConnectionHandler http2ConnectionHandler = http2ConnectionHandlerBuilder.build();
try {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.sslProvider(clientConfig.getSslProvider())
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2));
if (clientConfig.getSslContextProvider() != null) {
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
}
if (clientConfig.getTrustManagerFactory() != null) {
sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
}
SslContext sslContext = sslContextBuilder.build();
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();
if (clientConfig.isServerNameIdentification()) {
String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName();
SSLParameters params = engine.getSSLParameters();
params.setServerNames(Collections.singletonList(new SNIHostName(fullQualifiedHostname)));
engine.setSSLParameters(params);
}
ch.pipeline().addLast(sslHandler);
ApplicationProtocolNegotiationHandler negotiationHandler = new ApplicationProtocolNegotiationHandler("") {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(http2ConnectionHandler, http2SettingsHandler, http2ResponseHandler);
return;
}
ctx.close();
throw new IllegalStateException("unknown protocol: " + protocol);
}
};
ch.pipeline().addLast(negotiationHandler);
} catch (SSLException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} }
return http2ConnectionHandlerBuilder.build();
} }
} }

View file

@ -6,14 +6,18 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -22,12 +26,16 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> { public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
private static final Logger logger = Logger.getLogger(SimpleChannelPool.class.getName()); private static final Logger logger = Logger.getLogger(BoundedChannelPool.class.getName());
private final Semaphore semaphore; private final Semaphore semaphore;
private final HttpVersion httpVersion;
private final boolean isSecure;
private final ChannelPoolHandler channelPoolhandler; private final ChannelPoolHandler channelPoolhandler;
private final List<K> nodes; private final List<K> nodes;
@ -52,6 +60,8 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
/** /**
* @param semaphore the concurrency level * @param semaphore the concurrency level
* @param httpVersion the HTTP version of the pool connections
* @param isSecure if this pool has secure connections
* @param nodes the endpoint nodes, any element may contain the port (followed after ":") * @param nodes the endpoint nodes, any element may contain the port (followed after ":")
* to override the defaultPort argument * to override the defaultPort argument
* @param bootstrap bootstrap instance * @param bootstrap bootstrap instance
@ -59,16 +69,19 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
* @param retriesPerNode the max count of the subsequent connection failures to the node before * @param retriesPerNode the max count of the subsequent connection failures to the node before
* the node will be excluded from the pool. If set to 0, the value is ignored. * the node will be excluded from the pool. If set to 0, the value is ignored.
*/ */
public SimpleChannelPool(Semaphore semaphore, List<K> nodes, Bootstrap bootstrap, public BoundedChannelPool(Semaphore semaphore, HttpVersion httpVersion, boolean isSecure,
ChannelPoolHandler channelPoolHandler, int retriesPerNode) { List<K> nodes, Bootstrap bootstrap,
ChannelPoolHandler channelPoolHandler, int retriesPerNode) {
this.semaphore = semaphore; this.semaphore = semaphore;
this.httpVersion = httpVersion;
this.isSecure = isSecure;
this.channelPoolhandler = channelPoolHandler; this.channelPoolhandler = channelPoolHandler;
this.nodes = nodes; this.nodes = nodes;
this.retriesPerNode = retriesPerNode; this.retriesPerNode = retriesPerNode;
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
this.attributeKey = AttributeKey.valueOf("poolKey"); this.attributeKey = AttributeKey.valueOf("poolKey");
if (nodes == null || nodes.isEmpty()) { if (nodes == null || nodes.isEmpty()) {
throw new IllegalArgumentException("empty nodes array argument"); throw new IllegalArgumentException("nodes must not be empty");
} }
this.numberOfNodes = nodes.size(); this.numberOfNodes = nodes.size();
bootstraps = new HashMap<>(numberOfNodes); bootstraps = new HashMap<>(numberOfNodes);
@ -77,46 +90,48 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
counts = new HashMap<>(numberOfNodes); counts = new HashMap<>(numberOfNodes);
failedCounts = new HashMap<>(numberOfNodes); failedCounts = new HashMap<>(numberOfNodes);
for (K node : nodes) { for (K node : nodes) {
ChannelPoolInitializer initializer = new ChannelPoolInitializer(node, channelPoolHandler);
bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress()) bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress())
.handler(new ChannelInitializer<Channel>() { .handler(initializer));
@Override
protected void initChannel(Channel channel) throws Exception {
if(!channel.eventLoop().inEventLoop()) {
throw new IllegalStateException();
}
if (channelPoolHandler != null) {
channelPoolHandler.channelCreated(channel);
}
}
}));
availableChannels.put(node, new ConcurrentLinkedQueue<>()); availableChannels.put(node, new ConcurrentLinkedQueue<>());
counts.put(node, 0); counts.put(node, 0);
failedCounts.put(node, 0); failedCounts.put(node, 0);
} }
} }
public HttpVersion getVersion() {
return httpVersion;
}
public boolean isSecure() {
return isSecure;
}
public AttributeKey<K> getAttributeKey() {
return attributeKey;
}
@Override @Override
public void prepare(int count) throws ConnectException { public void prepare(int channelCount) throws ConnectException {
if (count > 0) { if (channelCount <= 0) {
for (int i = 0; i < count; i ++) { throw new IllegalArgumentException("channel count must be greater zero, but got " + channelCount);
Channel channel = connectToAnyNode();
if (channel == null) {
throw new ConnectException("failed to prepare the connections");
}
K nodeAddr = channel.attr(attributeKey).get();
if (channel.isActive()) {
Queue<Channel> channelQueue = availableChannels.get(nodeAddr);
if (channelQueue != null) {
channelQueue.add(channel);
}
} else {
channel.close();
}
}
logger.log(Level.FINE,"prepared " + count + " connections");
} else {
throw new IllegalArgumentException("Connection count should be > 0, but got " + count);
} }
for (int i = 0; i < channelCount; i++) {
Channel channel = newConnection();
if (channel == null) {
throw new ConnectException("failed to prepare");
}
K key = channel.attr(attributeKey).get();
if (channel.isActive()) {
Queue<Channel> channelQueue = availableChannels.get(key);
if (channelQueue != null) {
channelQueue.add(channel);
}
} else {
channel.close();
}
}
logger.log(Level.FINE,"prepared " + channelCount + " channels");
} }
@Override @Override
@ -124,7 +139,7 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
Channel channel = null; Channel channel = null;
if (semaphore.tryAcquire()) { if (semaphore.tryAcquire()) {
if ((channel = poll()) == null) { if ((channel = poll()) == null) {
channel = connectToAnyNode(); channel = newConnection();
} }
if (channel == null) { if (channel == null) {
semaphore.release(); semaphore.release();
@ -150,7 +165,7 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
Channel channel; Channel channel;
for (int i = 0; i < availableCount; i ++) { for (int i = 0; i < availableCount; i ++) {
if (null == (channel = poll())) { if (null == (channel = poll())) {
channel = connectToAnyNode(); channel = newConnection();
} }
if (channel == null) { if (channel == null) {
semaphore.release(availableCount - i); semaphore.release(availableCount - i);
@ -167,18 +182,23 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
@Override @Override
public void release(Channel channel) throws Exception { public void release(Channel channel) throws Exception {
K nodeAddr = channel.attr(attributeKey).get(); try {
if (channel.isActive()) { if (channel != null) {
Queue<Channel> channelQueue = availableChannels.get(nodeAddr); if (channel.isActive()) {
if (channelQueue != null) { K key = channel.attr(attributeKey).get();
channelQueue.add(channel); Queue<Channel> channelQueue = availableChannels.get(key);
if (channelQueue != null) {
channelQueue.add(channel);
}
} else if (channel.isOpen()) {
channel.close();
}
if (channelPoolhandler != null) {
channelPoolhandler.channelReleased(channel);
}
} }
} finally {
semaphore.release(); semaphore.release();
} else {
channel.close();
}
if (channelPoolhandler != null) {
channelPoolhandler.channelReleased(channel);
} }
} }
@ -193,65 +213,63 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
public void close() { public void close() {
lock.lock(); lock.lock();
try { try {
int closedConnCount = 0; int count = 0;
for (K nodeAddr : availableChannels.keySet()) { Set<Channel> channelSet = new HashSet<>();
for (Channel conn : availableChannels.get(nodeAddr)) { for (Map.Entry<K, Queue<Channel>> entry : availableChannels.entrySet()) {
if (conn.isOpen()) { channelSet.addAll(entry.getValue());
conn.close(); }
closedConnCount++; for (Map.Entry<K, List<Channel>> entry : channels.entrySet()) {
} channelSet.addAll(entry.getValue());
}
for (Channel channel : channelSet) {
if (channel != null && channel.isOpen()) {
logger.log(Level.FINE, "closing channel " + channel);
channel.close();
count++;
} }
} }
availableChannels.clear(); availableChannels.clear();
for (K nodeAddr : channels.keySet()) {
for (Channel channel : channels.get(nodeAddr)) {
if (channel != null && channel.isOpen()) {
channel.close();
closedConnCount++;
}
}
}
channels.clear(); channels.clear();
bootstraps.clear(); bootstraps.clear();
counts.clear(); counts.clear();
logger.log(Level.FINE, "closed " + closedConnCount + " connections"); logger.log(Level.FINE, "closed " + count + " connections");
} finally { } finally {
lock.unlock(); lock.unlock();
} }
} }
private Channel connectToAnyNode() throws ConnectException { private Channel newConnection() throws ConnectException {
Channel channel = null; Channel channel = null;
K nodeAddr = null; K key = null;
K nextNodeAddr; K nextKey;
int min = Integer.MAX_VALUE; int min = Integer.MAX_VALUE;
int next; int next;
int i = ThreadLocalRandom.current().nextInt(numberOfNodes); int i = ThreadLocalRandom.current().nextInt(numberOfNodes);
for (int j = i; j < numberOfNodes; j ++) { for (int j = i; j < numberOfNodes; j ++) {
nextNodeAddr = nodes.get(j % numberOfNodes); nextKey = nodes.get(j % numberOfNodes);
next = counts.get(nextNodeAddr); next = counts.get(nextKey);
if(next == 0) { if (next == 0) {
nodeAddr = nextNodeAddr; key = nextKey;
break; break;
} else if (next < min) { } else if (next < min) {
min = next; min = next;
nodeAddr = nextNodeAddr; key = nextKey;
} }
} }
if (nodeAddr != null) { if (key != null) {
logger.log(Level.FINE, "trying connection to " + nodeAddr); logger.log(Level.FINE, "trying connection to " + key);
try { try {
channel = connect(nodeAddr); channel = connect(key);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "failed to create a new connection to " + nodeAddr + ": " + e.toString()); logger.log(Level.WARNING, "failed to create a new connection to " + key + ": " + e.toString());
if (retriesPerNode > 0) { if (retriesPerNode > 0) {
int selectedNodeFailedConnAttemptsCount = failedCounts.get(nodeAddr) + 1; int selectedNodeFailedConnAttemptsCount = failedCounts.get(key) + 1;
failedCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount); failedCounts.put(key, selectedNodeFailedConnAttemptsCount);
if (selectedNodeFailedConnAttemptsCount > retriesPerNode) { if (selectedNodeFailedConnAttemptsCount > retriesPerNode) {
logger.log(Level.WARNING, "failed to connect to the node " + nodeAddr + " " logger.log(Level.WARNING, "failed to connect to the node " + key + " "
+ selectedNodeFailedConnAttemptsCount + " times, " + selectedNodeFailedConnAttemptsCount + " times, "
+ "excluding the node from the connection pool"); + "excluding the node from the connection pool");
counts.put(nodeAddr, Integer.MAX_VALUE); counts.put(key, Integer.MAX_VALUE);
boolean allNodesExcluded = true; boolean allNodesExcluded = true;
for (K node : nodes) { for (K node : nodes) {
if (counts.get(node) < Integer.MAX_VALUE) { if (counts.get(node) < Integer.MAX_VALUE) {
@ -272,22 +290,22 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
} }
} }
if (channel != null) { if (channel != null) {
channel.closeFuture().addListener(new CloseChannelListener(nodeAddr, channel)); channel.closeFuture().addListener(new CloseChannelListener(key, channel));
channel.attr(attributeKey).set(nodeAddr); channel.attr(attributeKey).set(key);
channels.computeIfAbsent(nodeAddr, node -> new ArrayList<>()).add(channel); channels.computeIfAbsent(key, node -> new ArrayList<>()).add(channel);
synchronized (counts) { synchronized (counts) {
counts.put(nodeAddr, counts.get(nodeAddr) + 1); counts.put(key, counts.get(key) + 1);
} }
if(retriesPerNode > 0) { if (retriesPerNode > 0) {
failedCounts.put(nodeAddr, 0); failedCounts.put(key, 0);
} }
logger.log(Level.FINE,"new connection to " + nodeAddr + " created"); logger.log(Level.FINE,"new connection to " + key + " created");
} }
return channel; return channel;
} }
private Channel connect(K addr) throws Exception { private Channel connect(K key) throws Exception {
Bootstrap bootstrap = bootstraps.get(addr); Bootstrap bootstrap = bootstraps.get(key);
if (bootstrap != null) { if (bootstrap != null) {
return bootstrap.connect().sync().channel(); return bootstrap.connect().sync().channel();
} }
@ -312,26 +330,26 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
private class CloseChannelListener implements ChannelFutureListener { private class CloseChannelListener implements ChannelFutureListener {
private final K nodeAddr; private final K key;
private final Channel channel; private final Channel channel;
private CloseChannelListener(K nodeAddr, Channel channel) { private CloseChannelListener(K key, Channel channel) {
this.nodeAddr = nodeAddr; this.key = key;
this.channel = channel; this.channel = channel;
} }
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
logger.log(Level.FINE,"connection to " + nodeAddr + " closed"); logger.log(Level.FINE,"connection to " + key + " closed");
lock.lock(); lock.lock();
try { try {
synchronized (counts) { synchronized (counts) {
if (counts.containsKey(nodeAddr)) { if (counts.containsKey(key)) {
counts.put(nodeAddr, counts.get(nodeAddr) - 1); counts.put(key, counts.get(key) - 1);
} }
} }
synchronized (channels) { synchronized (channels) {
List<Channel> channels = SimpleChannelPool.this.channels.get(nodeAddr); List<Channel> channels = BoundedChannelPool.this.channels.get(key);
if (channels != null) { if (channels != null) {
channels.remove(channel); channels.remove(channel);
} }
@ -342,4 +360,27 @@ public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
} }
} }
} }
class ChannelPoolInitializer extends ChannelInitializer<SocketChannel> {
private final K key;
private final ChannelPoolHandler channelPoolHandler;
ChannelPoolInitializer(K key, ChannelPoolHandler channelPoolHandler) {
this.key = key;
this.channelPoolHandler = channelPoolHandler;
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
if (!channel.eventLoop().inEventLoop()) {
throw new IllegalStateException();
}
channel.attr(attributeKey).set(key);
if (channelPoolHandler != null) {
channelPoolHandler.channelCreated(channel);
}
}
}
} }

View file

@ -36,7 +36,13 @@ public class RestClient {
public String asString() { public String asString() {
ByteBuf byteBuf = response != null ? response.content() : null; ByteBuf byteBuf = response != null ? response.content() : null;
return byteBuf != null && byteBuf.isReadable() ? response.content().toString(StandardCharsets.UTF_8) : null; try {
return byteBuf != null && byteBuf.isReadable() ? response.content().toString(StandardCharsets.UTF_8) : null;
} finally {
if (byteBuf != null) {
byteBuf.release();
}
}
} }
public static RestClient get(String urlString) throws IOException { public static RestClient get(String urlString) throws IOException {

View file

@ -14,7 +14,7 @@ public interface BackOff {
/** /**
* Reset to initial state. * Reset to initial state.
*/ */
void reset() throws IOException; void reset();
/** /**
* Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to
@ -33,7 +33,7 @@ public interface BackOff {
} }
* </pre> * </pre>
*/ */
long nextBackOffMillis() throws IOException; long nextBackOffMillis();
/** /**
* Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried

View file

@ -137,22 +137,19 @@ public class ExponentialBackOff implements BackOff {
/** /**
* @param builder builder * @param builder builder
*/ */
protected ExponentialBackOff(Builder builder) { private ExponentialBackOff(Builder builder) {
initialIntervalMillis = builder.initialIntervalMillis; initialIntervalMillis = builder.initialIntervalMillis;
randomizationFactor = builder.randomizationFactor; randomizationFactor = builder.randomizationFactor;
multiplier = builder.multiplier; multiplier = builder.multiplier;
maxIntervalMillis = builder.maxIntervalMillis; maxIntervalMillis = builder.maxIntervalMillis;
maxElapsedTimeMillis = builder.maxElapsedTimeMillis; maxElapsedTimeMillis = builder.maxElapsedTimeMillis;
nanoClock = builder.nanoClock; nanoClock = builder.nanoClock;
//Preconditions.checkArgument(initialIntervalMillis > 0);
//Preconditions.checkArgument(0 <= randomizationFactor && randomizationFactor < 1);
//Preconditions.checkArgument(multiplier >= 1);
//Preconditions.checkArgument(maxIntervalMillis >= initialIntervalMillis);
//Preconditions.checkArgument(maxElapsedTimeMillis > 0);
reset(); reset();
} }
/** Sets the interval back to the initial retry interval and restarts the timer. */ /**
* Sets the interval back to the initial retry interval and restarts the timer.
*/
public final void reset() { public final void reset() {
currentIntervalMillis = initialIntervalMillis; currentIntervalMillis = initialIntervalMillis;
startTimeNanos = nanoClock.nanoTime(); startTimeNanos = nanoClock.nanoTime();
@ -275,6 +272,29 @@ public class ExponentialBackOff implements BackOff {
} }
} }
/**
* Nano clock which can be used to measure elapsed time in nanoseconds.
*
* <p>
* The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations
* may be used for testing.
* </p>
*
*/
public interface NanoClock {
/**
* Returns the current value of the most precise available system timer, in nanoseconds for use to
* measure elapsed time, to match the behavior of {@link System#nanoTime()}.
*/
long nanoTime();
/**
* Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
*/
NanoClock SYSTEM = System::nanoTime;
}
/** /**
* Builder for {@link ExponentialBackOff}. * Builder for {@link ExponentialBackOff}.
* *
@ -285,7 +305,7 @@ public class ExponentialBackOff implements BackOff {
public static class Builder { public static class Builder {
/** The initial retry interval in milliseconds. */ /** The initial retry interval in milliseconds. */
int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS; private int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS;
/** /**
* The randomization factor to use for creating a range around the retry interval. * The randomization factor to use for creating a range around the retry interval.
@ -295,32 +315,53 @@ public class ExponentialBackOff implements BackOff {
* above the retry interval. * above the retry interval.
* </p> * </p>
*/ */
double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR; private double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR;
/** The value to multiply the current interval with for each retry attempt. */ /**
double multiplier = DEFAULT_MULTIPLIER; * The value to multiply the current interval with for each retry attempt.
*/
private double multiplier = DEFAULT_MULTIPLIER;
/** /**
* The maximum value of the back off period in milliseconds. Once the retry interval reaches * The maximum value of the back off period in milliseconds. Once the retry interval reaches
* this value it stops increasing. * this value it stops increasing.
*/ */
int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS; private int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS;
/** /**
* The maximum elapsed time in milliseconds after instantiating {@link ExponentialBackOff} or * The maximum elapsed time in milliseconds after instantiating {@link ExponentialBackOff} or
* calling {@link #reset()} after which {@link #nextBackOffMillis()} returns * calling {@link #reset()} after which {@link #nextBackOffMillis()} returns
* {@link BackOff#STOP}. * {@link BackOff#STOP}.
*/ */
int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS; private int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS;
/** Nano clock. */ /**
NanoClock nanoClock = NanoClock.SYSTEM; * Nano clock.
*/
private NanoClock nanoClock = NanoClock.SYSTEM;
public Builder() { public Builder() {
} }
/** Builds a new instance of {@link ExponentialBackOff}. */ /**
* Builds a new instance of {@link ExponentialBackOff}.
* */
public ExponentialBackOff build() { public ExponentialBackOff build() {
if (initialIntervalMillis <= 0) {
throw new IllegalArgumentException();
}
if (!(0 <= randomizationFactor && randomizationFactor < 1)) {
throw new IllegalArgumentException();
}
if (multiplier < 1) {
throw new IllegalArgumentException();
}
if ((maxIntervalMillis < initialIntervalMillis)) {
throw new IllegalArgumentException();
}
if (maxElapsedTimeMillis <= 0) {
throw new IllegalArgumentException();
}
return new ExponentialBackOff(this); return new ExponentialBackOff(this);
} }
@ -480,7 +521,9 @@ public class ExponentialBackOff implements BackOff {
* </p> * </p>
*/ */
public Builder setNanoClock(NanoClock nanoClock) { public Builder setNanoClock(NanoClock nanoClock) {
this.nanoClock = nanoClock; //Preconditions.checkNotNull(nanoClock); if (nanoClock != null) {
this.nanoClock = nanoClock;
}
return this; return this;
} }
} }

View file

@ -1,44 +0,0 @@
/*
* Copyright (c) 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.xbib.netty.http.client.retry;
/**
* Nano clock which can be used to measure elapsed time in nanoseconds.
*
* <p>
* The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations
* may be used for testing.
* </p>
*
* @since 1.14
* @author Yaniv Inbar
*/
public interface NanoClock {
/**
* Returns the current value of the most precise available system timer, in nanoseconds for use to
* measure elapsed time, to match the behavior of {@link System#nanoTime()}.
*/
long nanoTime();
/**
* Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
*/
NanoClock SYSTEM = new NanoClock() {
public long nanoTime() {
return System.nanoTime();
}
};
}

View file

@ -5,7 +5,9 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.ClientCookieEncoder; import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.handler.codec.http2.HttpConversionUtil;
@ -16,6 +18,9 @@ import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.RequestBuilder; import org.xbib.netty.http.client.RequestBuilder;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.retry.BackOff;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
@ -48,6 +53,8 @@ abstract class BaseTransport implements Transport {
protected SortedMap<Integer, Request> requests; protected SortedMap<Integer, Request> requests;
protected Throwable throwable;
private Map<Cookie, Boolean> cookieBox; private Map<Cookie, Boolean> cookieBox;
BaseTransport(Client client, HttpAddress httpAddress) { BaseTransport(Client client, HttpAddress httpAddress) {
@ -56,43 +63,55 @@ abstract class BaseTransport implements Transport {
this.requests = new ConcurrentSkipListMap<>(); this.requests = new ConcurrentSkipListMap<>();
} }
@Override
public HttpAddress httpAddress() {
return httpAddress;
}
@Override @Override
public Transport execute(Request request) throws IOException { public Transport execute(Request request) throws IOException {
ensureConnect(); ensureConnect();
// some HTTP 1.1 servers like Elasticsearch do not understand full URIs in HTTP command line if (throwable != null) {
String uri = request.httpVersion().majorVersion() < 2 ? return this;
}
// Some HTTP 1 servers do not understand URIs in HTTP command line in spite of RFC 7230.
// The "origin form" requires a "Host" header.
// Our algorithm is: use always "origin form" for HTTP 1, use absolute form for HTTP 2.
// The reason is that Netty derives the HTTP/2 scheme header from the absolute form.
String uri = request.httpVersion().majorVersion() == 1 ?
request.base().relativeReference() : request.base().toString(); request.base().relativeReference() : request.base().toString();
FullHttpRequest fullHttpRequest = request.content() == null ? FullHttpRequest fullHttpRequest = request.content() == null ?
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) : new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) :
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri, new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri,
request.content()); request.content());
logger.log(Level.INFO, fullHttpRequest.toString()); try {
Integer streamId = nextStream(); Integer streamId = nextStream();
if (streamId != null) { if (streamId != null && streamId > 0) {
request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId)); request.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
} else {
if (request.httpVersion().majorVersion() == 2) {
logger.log(Level.WARNING, "no streamId but HTTP/2 request. Strange!!! " + getClass().getName());
}
}
// add matching cookies from box (previous requests) and new cookies from request builder
Collection<Cookie> cookies = new ArrayList<>();
cookies.addAll(matchCookiesFromBox(request));
cookies.addAll(matchCookies(request));
if (!cookies.isEmpty()) {
request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies));
}
// add stream-id and cookie headers
fullHttpRequest.headers().set(request.headers());
if (streamId != null) {
requests.put(streamId, request);
}
if (channel.isWritable()) {
channel.writeAndFlush(fullHttpRequest);
}
} finally {
request.close();
} }
// add matching cookies from box (previous requests) and new cookies from request builder
Collection<Cookie> cookies = new ArrayList<>();
cookies.addAll(matchCookiesFromBox(request));
cookies.addAll(matchCookies(request));
if (!cookies.isEmpty()) {
request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies));
}
// add stream-id and cookie headers
fullHttpRequest.headers().set(request.headers());
requests.put(streamId, request);
logger.log(Level.FINE, () -> "streamId = " + streamId + " writing request = " + fullHttpRequest);
channel.writeAndFlush(fullHttpRequest);
return this; return this;
} }
/** /**
* Experimental. * Experimental method for executing in a wrapping completable future.
* @param request request * @param request request
* @param supplier supplier * @param supplier supplier
* @param <T> supplier result * @param <T> supplier result
@ -102,44 +121,86 @@ abstract class BaseTransport implements Transport {
public <T> CompletableFuture<T> execute(Request request, public <T> CompletableFuture<T> execute(Request request,
Function<FullHttpResponse, T> supplier) throws IOException { Function<FullHttpResponse, T> supplier) throws IOException {
final CompletableFuture<T> completableFuture = new CompletableFuture<>(); final CompletableFuture<T> completableFuture = new CompletableFuture<>();
//request.setExceptionListener(completableFuture::completeExceptionally);
request.setResponseListener(response -> completableFuture.complete(supplier.apply(response))); request.setResponseListener(response -> completableFuture.complete(supplier.apply(response)));
execute(request); execute(request);
return completableFuture; return completableFuture;
} }
@Override @Override
public synchronized void close() { public synchronized void close() throws IOException {
get(); get();
if (channel != null) { client.releaseChannel(channel);
channel.close();
channel = null;
}
} }
protected void ensureConnect() throws IOException { @Override
if (channel == null) { public boolean isFailed() {
try { return throwable != null;
channel = client.newChannel(httpAddress); }
channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
awaitSettings(); @Override
} catch (InterruptedException e) { public Throwable getFailure() {
throw new ConnectException("unable to connect to " + httpAddress); return throwable;
}
@Override
public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
Request request = fromStreamId(streamId);
if (request != null) {
HttpHeadersListener httpHeadersListener = request.getHeadersListener();
if (httpHeadersListener != null) {
httpHeadersListener.onHeaders(httpHeaders);
}
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
addCookie(cookie);
CookieListener cookieListener = request.getCookieListener();
if (cookieListener != null) {
cookieListener.onCookie(cookie);
}
} }
} }
} }
protected Request continuation(Integer streamId, FullHttpResponse httpResponse) throws URLSyntaxException { private void ensureConnect() throws IOException {
if (channel == null) {
channel = client.newChannel(httpAddress);
if (channel != null) {
channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
awaitSettings();
} else {
ConnectException connectException;
if (httpAddress != null) {
connectException = new ConnectException("unable to connect to " + httpAddress);
} else if (client.hasPooledConnections()){
connectException = new ConnectException("unable to get channel from pool");
} else {
// if API misuse
connectException = new ConnectException("unable to get channel");
}
this.throwable = connectException;
this.channel = null;
throw connectException;
}
}
}
protected Request fromStreamId(Integer streamId) {
if (streamId == null) {
streamId = requests.lastKey();
}
return requests.get(streamId);
}
protected Request continuation(Request request, FullHttpResponse httpResponse) throws URLSyntaxException {
if (httpResponse == null) { if (httpResponse == null) {
return null; return null;
} }
Request request = fromStreamId(streamId);
if (request == null) { if (request == null) {
// push promise // push promise or something else
return null; return null;
} }
try { try {
if (request.checkRedirect()) { if (request.canRedirect()) {
int status = httpResponse.status().code(); int status = httpResponse.status().code();
switch (status) { switch (status) {
case 300: case 300:
@ -152,7 +213,7 @@ abstract class BaseTransport implements Transport {
String location = httpResponse.headers().get(HttpHeaderNames.LOCATION); String location = httpResponse.headers().get(HttpHeaderNames.LOCATION);
location = new PercentDecoder(StandardCharsets.UTF_8.newDecoder()).decode(location); location = new PercentDecoder(StandardCharsets.UTF_8.newDecoder()).decode(location);
if (location != null) { if (location != null) {
logger.log(Level.INFO, "found redirect location: " + location); logger.log(Level.FINE, "found redirect location: " + location);
URL redirUrl = URL.base(request.base()).resolve(location); URL redirUrl = URL.base(request.base()).resolve(location);
HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod(); HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod();
RequestBuilder newHttpRequestBuilder = Request.builder(method) RequestBuilder newHttpRequestBuilder = Request.builder(method)
@ -160,45 +221,75 @@ abstract class BaseTransport implements Transport {
.setVersion(request.httpVersion()) .setVersion(request.httpVersion())
.setHeaders(request.headers()) .setHeaders(request.headers())
.content(request.content()); .content(request.content());
// TODO(jprante) convencience to copy pathAndQuery from one request to another
request.base().getQueryParams().forEach(pair -> request.base().getQueryParams().forEach(pair ->
newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond()) newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond())
); );
request.cookies().forEach(newHttpRequestBuilder::addCookie); request.cookies().forEach(newHttpRequestBuilder::addCookie);
Request newHttpRequest = newHttpRequestBuilder.build(); Request newHttpRequest = newHttpRequestBuilder.build();
newHttpRequest.setResponseListener(request.getResponseListener()); newHttpRequest.setResponseListener(request.getResponseListener());
//newHttpRequest.setExceptionListener(request.getExceptionListener());
newHttpRequest.setHeadersListener(request.getHeadersListener()); newHttpRequest.setHeadersListener(request.getHeadersListener());
newHttpRequest.setCookieListener(request.getCookieListener()); newHttpRequest.setCookieListener(request.getCookieListener());
//newHttpRequest.setPushListener(request.getPushListener());
StringBuilder hostAndPort = new StringBuilder(); StringBuilder hostAndPort = new StringBuilder();
hostAndPort.append(redirUrl.getHost()); hostAndPort.append(redirUrl.getHost());
if (redirUrl.getPort() != null) { if (redirUrl.getPort() != null) {
hostAndPort.append(':').append(redirUrl.getPort()); hostAndPort.append(':').append(redirUrl.getPort());
} }
newHttpRequest.headers().set(HttpHeaderNames.HOST, hostAndPort.toString()); newHttpRequest.headers().set(HttpHeaderNames.HOST, hostAndPort.toString());
logger.log(Level.INFO, "redirect url: " + redirUrl + logger.log(Level.FINE, "redirect url: " + redirUrl +
" old request: " + request.toString() + " old request: " + request.toString() +
" new request: " + newHttpRequest.toString()); " new request: " + newHttpRequest.toString());
return newHttpRequest; return newHttpRequest;
} }
break; break;
default: default:
logger.log(Level.FINE, "no redirect because of status code " + status);
break; break;
} }
} }
} catch (MalformedInputException | UnmappableCharacterException e) { } catch (MalformedInputException | UnmappableCharacterException e) {
logger.log(Level.WARNING, e.getMessage(), e); this.throwable = e;
} }
return null; return null;
} }
protected Request fromStreamId(Integer streamId) { protected Request retry(Request request, FullHttpResponse httpResponse) {
if (streamId == null) { if (httpResponse == null) {
streamId = requests.lastKey(); return null;
} }
return requests.get(streamId); if (request == null) {
// push promise or something else
return null;
}
if (request.isBackOff()) {
BackOff backOff = request.getBackOff() != null ? request.getBackOff() :
client.getClientConfig().getBackOff();
int status = httpResponse.status().code();
switch (status) {
case 403:
case 404:
case 500:
case 502:
case 503:
case 504:
case 507:
case 509:
if (backOff != null) {
long millis = backOff.nextBackOffMillis();
if (millis != BackOff.STOP) {
logger.log(Level.FINE, "status = " + status + " backing off request by " + millis + " milliseconds");
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
// ignore
}
return request;
}
}
break;
default:
break;
}
}
return null;
} }
public void setCookieBox(Map<Cookie, Boolean> cookieBox) { public void setCookieBox(Map<Cookie, Boolean> cookieBox) {
@ -209,7 +300,7 @@ abstract class BaseTransport implements Transport {
return cookieBox; return cookieBox;
} }
public void addCookie(Cookie cookie) { private void addCookie(Cookie cookie) {
if (cookieBox == null) { if (cookieBox == null) {
this.cookieBox = Collections.synchronizedMap(new LRUCache<Cookie, Boolean>(32)); this.cookieBox = Collections.synchronizedMap(new LRUCache<Cookie, Boolean>(32));
} }
@ -241,7 +332,8 @@ abstract class BaseTransport implements Transport {
return (secureScheme && cookie.isSecure()) || (!secureScheme && !cookie.isSecure()); return (secureScheme && cookie.isSecure()) || (!secureScheme && !cookie.isSecure());
} }
class LRUCache<K, V> extends LinkedHashMap<K, V> { @SuppressWarnings("serial")
static class LRUCache<K, V> extends LinkedHashMap<K, V> {
private final int cacheSize; private final int cacheSize;

View file

@ -2,18 +2,12 @@ package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import org.xbib.net.URLSyntaxException; import org.xbib.net.URLSyntaxException;
import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener; import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException; import java.io.IOException;
@ -27,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class Http2Transport extends BaseTransport implements Transport { public class Http2Transport extends BaseTransport {
private static final Logger logger = Logger.getLogger(Http2Transport.class.getName()); private static final Logger logger = Logger.getLogger(Http2Transport.class.getName());
@ -41,7 +35,9 @@ public class Http2Transport extends BaseTransport implements Transport {
super(client, httpAddress); super(client, httpAddress);
streamIdCounter = new AtomicInteger(3); streamIdCounter = new AtomicInteger(3);
streamidPromiseMap = new ConcurrentSkipListMap<>(); streamidPromiseMap = new ConcurrentSkipListMap<>();
settingsPromise = new CompletableFuture<>(); settingsPromise = (httpAddress != null && httpAddress.isSecure()) ||
(client.hasPooledConnections() && client.getPool().isSecure()) ?
new CompletableFuture<>() : null;
} }
@Override @Override
@ -69,62 +65,47 @@ public class Http2Transport extends BaseTransport implements Transport {
public void awaitSettings() { public void awaitSettings() {
if (settingsPromise != null) { if (settingsPromise != null) {
try { try {
settingsPromise.get(client.getTimeout(), TimeUnit.MILLISECONDS); settingsPromise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) { } catch (InterruptedException | ExecutionException | TimeoutException e) {
settingsPromise.completeExceptionally(e); settingsPromise.completeExceptionally(e);
} }
} else {
logger.log(Level.WARNING, "waiting for settings but no promise present");
} }
} }
@Override @Override
public void responseReceived(Integer streamId, FullHttpResponse fullHttpResponse) { public void responseReceived(Integer streamId, FullHttpResponse fullHttpResponse) {
if (streamId == null) { if (streamId == null) {
logger.log(Level.WARNING, "unexpected message received: " + fullHttpResponse); logger.log(Level.WARNING, "no stream ID, unexpected message received: " + fullHttpResponse);
return; return;
} }
CompletableFuture<Boolean> promise = streamidPromiseMap.get(streamId); CompletableFuture<Boolean> promise = streamidPromiseMap.get(streamId);
if (promise == null) { if (promise == null) {
logger.log(Level.WARNING, "response received for unknown stream id " + streamId); logger.log(Level.WARNING, "response received for stream ID " + streamId + " but found no promise");
} else { return;
Request request = fromStreamId(streamId);
if (request != null) {
HttpResponseListener responseListener = request.getResponseListener();
if (responseListener != null) {
responseListener.onResponse(fullHttpResponse);
}
try {
request = continuation(streamId, fullHttpResponse);
if (request != null) {
// synchronous call here
client.continuation(this, request);
}
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
}
// complete origin
promise.complete(true);
} }
}
@Override
public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
Request request = fromStreamId(streamId); Request request = fromStreamId(streamId);
if (request != null) { if (request != null) {
HttpHeadersListener httpHeadersListener = request.getHeadersListener(); HttpResponseListener responseListener = request.getResponseListener();
if (httpHeadersListener != null) { if (responseListener != null) {
httpHeadersListener.onHeaders(httpHeaders); responseListener.onResponse(fullHttpResponse);
} }
CookieListener cookieListener = request.getCookieListener(); try {
if (cookieListener != null) { Request retryRequest = retry(request, fullHttpResponse);
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { if (retryRequest != null) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); // retry transport, wait for completion
cookieListener.onCookie(cookie); client.retry(this, retryRequest);
} else {
Request continueRequest = continuation(request, fullHttpResponse);
if (continueRequest != null) {
// continue with new transport, synchronous call here, wait for completion
client.continuation(this, continueRequest);
}
} }
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
} }
} }
promise.complete(true);
} }
@Override @Override
@ -134,16 +115,25 @@ public class Http2Transport extends BaseTransport implements Transport {
} }
@Override @Override
public void awaitResponse(Integer streamId) { public void awaitResponse(Integer streamId) throws IOException {
if (streamId == null) { if (streamId == null) {
return; return;
} }
if (throwable != null) {
return;
}
CompletableFuture<Boolean> promise = streamidPromiseMap.get(streamId); CompletableFuture<Boolean> promise = streamidPromiseMap.get(streamId);
if (promise != null) { if (promise != null) {
try { try {
promise.get(client.getTimeout(), TimeUnit.MILLISECONDS); long millis = client.getClientConfig().getReadTimeoutMillis();
Request request = fromStreamId(streamId);
if (request != null && request.getTimeoutInMillis() > 0) {
millis = request.getTimeoutInMillis();
}
promise.get(millis, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) { } catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.log(Level.WARNING, "streamId=" + streamId + " " + e.getMessage(), e); this.throwable = e;
throw new IOException(e);
} finally { } finally {
streamidPromiseMap.remove(streamId); streamidPromiseMap.remove(streamId);
} }
@ -153,7 +143,14 @@ public class Http2Transport extends BaseTransport implements Transport {
@Override @Override
public Transport get() { public Transport get() {
for (Integer streamId : streamidPromiseMap.keySet()) { for (Integer streamId : streamidPromiseMap.keySet()) {
awaitResponse(streamId); try {
awaitResponse(streamId);
} catch (IOException e) {
notifyRequest(streamId, e);
}
}
if (throwable != null) {
streamidPromiseMap.clear();
} }
return this; return this;
} }
@ -165,10 +162,32 @@ public class Http2Transport extends BaseTransport implements Transport {
} }
} }
/**
* The underlying network layer failed, not possible to know the request.
* So we fail all (open) promises.
* @param throwable the exception
*/
@Override @Override
public void fail(Throwable throwable) { public void fail(Throwable throwable) {
// fail fast, do not fail more than once
if (this.throwable != null) {
return;
}
this.throwable = throwable;
for (CompletableFuture<Boolean> promise : streamidPromiseMap.values()) { for (CompletableFuture<Boolean> promise : streamidPromiseMap.values()) {
promise.completeExceptionally(throwable); promise.completeExceptionally(throwable);
} }
} }
/**
* Try to notify request about failure.
* @param streamId stream ID
* @param throwable the exception
*/
private void notifyRequest(Integer streamId, Throwable throwable) {
Request request = fromStreamId(streamId);
if (request != null && request.getCompletableFuture() != null) {
request.getCompletableFuture().completeExceptionally(throwable);
}
}
} }

View file

@ -2,18 +2,12 @@ package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import org.xbib.net.URLSyntaxException; import org.xbib.net.URLSyntaxException;
import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener; import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException; import java.io.IOException;
@ -27,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class HttpTransport extends BaseTransport implements Transport { public class HttpTransport extends BaseTransport {
private static final Logger logger = Logger.getLogger(HttpTransport.class.getName()); private static final Logger logger = Logger.getLogger(HttpTransport.class.getName());
@ -43,7 +37,7 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override @Override
public Integer nextStream() { public Integer nextStream() {
Integer streamId = sequentialCounter.getAndAdd(1); Integer streamId = sequentialCounter.getAndIncrement();
if (streamId == Integer.MIN_VALUE) { if (streamId == Integer.MIN_VALUE) {
// reset if overflow, Java wraps atomic integers to Integer.MIN_VALUE // reset if overflow, Java wraps atomic integers to Integer.MIN_VALUE
sequentialCounter.set(0); sequentialCounter.set(0);
@ -71,9 +65,18 @@ public class HttpTransport extends BaseTransport implements Transport {
} }
} }
try { try {
request = continuation(null, fullHttpResponse); Request retryRequest = retry(request, fullHttpResponse);
if (request != null) { if (retryRequest != null) {
client.continuation(this, request); // retry transport, wait for completion
client.retry(this, retryRequest);
retryRequest.close();
} else {
Request continueRequest = continuation(request, fullHttpResponse);
if (continueRequest != null) {
// continue with new transport, synchronous call here, wait for completion
client.continuation(this, continueRequest);
continueRequest.close();
}
} }
} catch (URLSyntaxException | IOException e) { } catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e); logger.log(Level.WARNING, e.getMessage(), e);
@ -86,40 +89,25 @@ public class HttpTransport extends BaseTransport implements Transport {
} }
} }
@Override
public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
Request request = fromStreamId(streamId);
if (request != null) {
HttpHeadersListener httpHeadersListener = request.getHeadersListener();
if (httpHeadersListener != null) {
httpHeadersListener.onHeaders(httpHeaders);
}
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
addCookie(cookie);
CookieListener cookieListener = request.getCookieListener();
if (cookieListener != null) {
cookieListener.onCookie(cookie);
}
}
}
}
@Override @Override
public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) { public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) {
} }
@Override @Override
public void awaitResponse(Integer streamId) { public void awaitResponse(Integer streamId) throws IOException {
if (streamId == null) { if (streamId == null) {
return; return;
} }
if (throwable != null) {
return;
}
CompletableFuture<Boolean> promise = sequentialPromiseMap.get(streamId); CompletableFuture<Boolean> promise = sequentialPromiseMap.get(streamId);
if (promise != null) { if (promise != null) {
try { try {
promise.get(client.getTimeout(), TimeUnit.MILLISECONDS); promise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) { } catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.log(Level.WARNING, "streamId=" + streamId + " " + e.getMessage(), e); this.throwable = e;
throw new IOException(e);
} finally { } finally {
sequentialPromiseMap.remove(streamId); sequentialPromiseMap.remove(streamId);
} }
@ -128,8 +116,15 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override @Override
public Transport get() { public Transport get() {
for (Integer streamId : sequentialPromiseMap.keySet()) { try {
awaitResponse(streamId); for (Integer streamId : sequentialPromiseMap.keySet()) {
awaitResponse(streamId);
client.releaseChannel(channel);
}
} catch (IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
} finally {
sequentialPromiseMap.clear();
} }
return this; return this;
} }
@ -143,9 +138,9 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override @Override
public void fail(Throwable throwable) { public void fail(Throwable throwable) {
this.throwable = throwable;
for (CompletableFuture<Boolean> promise : sequentialPromiseMap.values()) { for (CompletableFuture<Boolean> promise : sequentialPromiseMap.values()) {
promise.completeExceptionally(throwable); promise.completeExceptionally(throwable);
} }
} }
} }

View file

@ -7,7 +7,6 @@ import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.Request;
import java.io.IOException; import java.io.IOException;
@ -19,8 +18,6 @@ public interface Transport {
AttributeKey<Transport> TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport"); AttributeKey<Transport> TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport");
HttpAddress httpAddress();
Transport execute(Request request) throws IOException; Transport execute(Request request) throws IOException;
<T> CompletableFuture<T> execute(Request request, Function<FullHttpResponse, T> supplier) throws IOException; <T> CompletableFuture<T> execute(Request request, Function<FullHttpResponse, T> supplier) throws IOException;
@ -41,7 +38,7 @@ public interface Transport {
void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers); void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers);
void awaitResponse(Integer streamId); void awaitResponse(Integer streamId) throws IOException;
Transport get(); Transport get();
@ -49,5 +46,9 @@ public interface Transport {
void fail(Throwable throwable); void fail(Throwable throwable);
void close(); boolean isFailed();
Throwable getFailure();
void close() throws IOException;
} }

View file

@ -395,10 +395,8 @@ public class NetworkUtils {
if (predicate.test(networkInterface)) { if (predicate.test(networkInterface)) {
networkInterfaces.add(networkInterface); networkInterfaces.add(networkInterface);
Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces(); Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces();
if (subInterfaces.hasMoreElements()) { while (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) { networkInterfaces.add(subInterfaces.nextElement());
networkInterfaces.add(subInterfaces.nextElement());
}
} }
} }
} }

View file

@ -26,7 +26,7 @@ public class CompletableFutureTest {
final Function<FullHttpResponse, String> httpResponseStringFunction = response -> final Function<FullHttpResponse, String> httpResponseStringFunction = response ->
response.content().toString(StandardCharsets.UTF_8); response.content().toString(StandardCharsets.UTF_8);
Request request = Request.get() Request request = Request.get()
.url("http://alkmene.hbz-nrw.de/repository/org/xbib/content/2.0.0-SNAPSHOT/maven-metadata-local.xml") .url("https://repo.maven.apache.org/maven2/org/xbib/netty-http-client/maven-metadata.xml.sha1")
.build(); .build();
CompletableFuture<String> completableFuture = client.execute(request, httpResponseStringFunction) CompletableFuture<String> completableFuture = client.execute(request, httpResponseStringFunction)
.exceptionally(Throwable::getMessage) .exceptionally(Throwable::getMessage)

View file

@ -17,15 +17,14 @@ public class ConscryptTest extends LoggingBase {
@Test @Test
public void testConscrypt() throws IOException { public void testConscrypt() throws IOException {
Client client = Client.builder() Client client = Client.builder()
.enableDebug()
.setJdkSslProvider() .setJdkSslProvider()
.setSslContextProvider(Conscrypt.newProvider()) .setSslContextProvider(Conscrypt.newProvider())
.build(); .build();
logger.log(Level.INFO, client.getClientConfig().toString()); logger.log(Level.INFO, client.getClientConfig().toString());
try { try {
Request request = Request.get() Request request = Request.get()
.url("https://fl-test.hbz-nrw.de") .url("https://xbib.org")
.setVersion("HTTP/2.0") .setVersion("HTTP/1.1")
.build() .build()
.setResponseListener(fullHttpResponse -> { .setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);

View file

@ -26,7 +26,7 @@ public class CookieSetterHttpBinTest extends LoggingBase {
* } * }
* } * }
* </pre> * </pre>
* @throws Exception * @throws IOException if test fails
*/ */
@Test @Test
public void testHttpBinCookies() throws IOException { public void testHttpBinCookies() throws IOException {

View file

@ -83,7 +83,7 @@ public class ElasticsearchTest extends LoggingBase {
.build() .build()
.setResponseListener(fullHttpResponse -> .setResponseListener(fullHttpResponse ->
logger.log(Level.FINE, "status = " + fullHttpResponse.status() + logger.log(Level.FINE, "status = " + fullHttpResponse.status() +
" counter = " + count.incrementAndGet() + " counter = " + count.getAndIncrement() +
" response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8))); " response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8)));
} }

View file

@ -1,24 +1,34 @@
package org.xbib.netty.http.client.test; package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.Request;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class Http1Test { public class Http1Test extends LoggingBase {
private static final Logger logger = Logger.getLogger(Http1Test.class.getName()); private static final Logger logger = Logger.getLogger(Http1Test.class.getName());
@After
public void checkThreads() {
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
logger.log(Level.INFO, "threads = " + threadSet.size() );
threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString()));
}
@Test @Test
public void testHttp1() throws Exception { public void testHttp1() throws Exception {
Client client = new Client(); Client client = new Client();
try { try {
Request request = Request.get().url("http://fl.hbz-nrw.de").build() Request request = Request.get().url("http://xbib.org").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() + msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) + msg.content().toString(StandardCharsets.UTF_8) +
@ -30,26 +40,29 @@ public class Http1Test {
} }
@Test @Test
public void testHttp1ParallelRequests() throws IOException { @Ignore
Client client = new Client(); public void testParallelRequests() throws IOException {
Client client = Client.builder().enableDebug().build();
try { try {
Request request1 = Request.builder(HttpMethod.GET) Request request1 = Request.builder(HttpMethod.GET)
.url("http://fl.hbz-nrw.de").setVersion("HTTP/1.1") .url("http://xbib.org").setVersion("HTTP/1.1")
.build() .build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() + msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) + //msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code())); " status=" + msg.status().code()));
Request request2 = Request.builder(HttpMethod.GET) Request request2 = Request.builder(HttpMethod.GET)
.url("http://fl.hbz-nrw.de/app/fl/").setVersion("HTTP/1.1") .url("http://xbib.org").setVersion("HTTP/1.1")
.build() .build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() + msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) + //msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code())); " status=" + msg.status().code()));
client.execute(request1); for (int i = 0; i < 10; i++) {
client.execute(request2); client.execute(request1);
client.execute(request2);
}
} finally { } finally {
client.shutdownGracefully(); client.shutdownGracefully();
@ -57,7 +70,8 @@ public class Http1Test {
} }
@Test @Test
public void testTwoTransports() throws Exception { @Ignore
public void testSequentialRequests() throws Exception {
Client client = Client.builder().enableDebug().build(); Client client = Client.builder().enableDebug().build();
try { try {
Request request1 = Request.get().url("http://xbib.org").build() Request request1 = Request.get().url("http://xbib.org").build()

View file

@ -11,25 +11,39 @@ import java.nio.charset.StandardCharsets;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class Http2Test { public class Http2Test extends LoggingBase {
private static final Logger logger = Logger.getLogger(Http2Test.class.getName()); private static final Logger logger = Logger.getLogger(Http2Test.class.getName());
/** /**
* Problems with akamai:
*
* 2018-03-07 16:02:52.385 FEIN [client] io.netty.handler.codec.http2.Http2FrameLogger logRstStream
* [id: 0x57cc65bb, L:/10.1.1.94:52834 - R:http2.akamai.com/104.94.191.203:443] INBOUND RST_STREAM: streamId=2 errorCode=8
* 2018-03-07 16:02:52.385 FEIN [client] io.netty.handler.codec.http2.Http2FrameLogger logGoAway
* [id: 0x57cc65bb, L:/10.1.1.94:52834 - R:http2.akamai.com/104.94.191.203:443] OUTBOUND GO_AWAY: lastStreamId=2 errorCode=0 length=0 bytes=
*
* demo/h2_demo_frame.html sends no content, only a push promise, and does not continue
*
* @throws IOException
*/ */
@Test @Test
@Ignore
public void testAkamai() throws IOException { public void testAkamai() throws IOException {
Client client = Client.builder().enableDebug().build(); Client client = Client.builder()
.enableDebug()
.addServerNameForIdentification("http2.akamai.com")
.build();
try { try {
Request request = Request.get() Request request = Request.get()
.url("https://http2.akamai.com/demo/h2_demo_frame.html") .url("https://http2.akamai.com/demo/h2_demo_frame.html")
//.url("https://http2.akamai.com/") //.url("https://http2.akamai.com/")
.setVersion("HTTP/2.0") .setVersion("HTTP/2.0")
.build() .build()
.setResponseListener(fullHttpResponse -> { .setResponseListener(msg -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); String response = msg.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() logger.log(Level.INFO, "status = " + msg.status() +
+ " response body = " + response); msg.headers().entries() + " " + response);
}); });
client.execute(request).get(); client.execute(request).get();
} finally { } finally {
@ -55,17 +69,18 @@ public class Http2Test {
@Test @Test
public void testHttp2PushIO() throws IOException { public void testHttp2PushIO() throws IOException {
//String url = "https://webtide.com";
String url = "https://http2-push.io"; String url = "https://http2-push.io";
// TODO register push announces into promises in order to wait for them all. Client client = Client.builder()
Client client = Client.builder().enableDebug().build(); .enableDebug()
.addServerNameForIdentification("http2-push.io")
.build();
try { try {
Request request = Request.builder(HttpMethod.GET) Request request = Request.builder(HttpMethod.GET)
.url(url).setVersion("HTTP/2.0") .url(url).setVersion("HTTP/2.0")
.build() .build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() + msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) + //msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code())); " status=" + msg.status().code()));
client.execute(request).get(); client.execute(request).get();
@ -75,7 +90,7 @@ public class Http2Test {
} }
@Test @Test
public void testWebtideTwoRequestsOnSameConnection() { public void testWebtideTwoRequestsOnSameConnection() throws IOException {
Client client = new Client(); Client client = new Client();
try { try {
Request request1 = Request.builder(HttpMethod.GET) Request request1 = Request.builder(HttpMethod.GET)
@ -95,8 +110,6 @@ public class Http2Test {
" status=" + msg.status().code())); " status=" + msg.status().code()));
client.execute(request1).execute(request2); client.execute(request1).execute(request2);
} catch (IOException e) {
//
} finally { } finally {
client.shutdownGracefully(); client.shutdownGracefully();
} }

View file

@ -0,0 +1,28 @@
package org.xbib.netty.http.client.test;
import org.junit.After;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import java.io.IOException;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
public class LeakTest {
private static final Logger logger = Logger.getLogger(LeakTest.class.getName());
@After
public void checkThreads() {
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
logger.log(Level.INFO, "threads = " + threadSet.size() );
threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString()));
}
@Test
public void testForLeaks() throws IOException, InterruptedException {
Client client = new Client();
client.shutdownGracefully();
}
}

View file

@ -17,7 +17,7 @@ public class LoggingBase {
Handler handler = new ConsoleHandler(); Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter()); handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler); rootLogger.addHandler(handler);
rootLogger.setLevel(Level.INFO); rootLogger.setLevel(Level.ALL);
for (Handler h : rootLogger.getHandlers()) { for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter()); handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.ALL); h.setLevel(Level.ALL);

View file

@ -0,0 +1,65 @@
package org.xbib.netty.http.client.test;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
public class PooledClientTest extends LoggingBase {
private static final Logger logger = Logger.getLogger("");
@Test
public void testPooledClientWithSingleNode() throws IOException {
int loop = 10;
HttpAddress httpAddress = HttpAddress.http1("xbib.org", 80);
Client client = Client.builder()
.addPoolNode(httpAddress)
.setPoolSecure(httpAddress.isSecure())
.setPoolNodeConnectionLimit(16)
.build();
AtomicInteger count = new AtomicInteger();
try {
int threads = 16;
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int n = 0; n < threads; n++) {
executorService.submit(() -> {
try {
logger.log(Level.INFO, "starting " + Thread.currentThread());
for (int i = 0; i < loop; i++) {
Request request = Request.get()
.url("http://xbib.org/repository/")
.setVersion("HTTP/1.1")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
//logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
});
client.pooledExecute(request).get();
count.getAndIncrement();
}
logger.log(Level.INFO, "done " + Thread.currentThread());
} catch (IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.log(Level.WARNING, e.getMessage(), e);
} finally {
client.shutdownGracefully();
}
logger.log(Level.INFO, "count = " + count.get());
}
}

View file

@ -88,11 +88,11 @@ public class XbibTest extends LoggingBase {
@Test @Test
public void testXbibOrgWithVeryShortReadTimeout() throws IOException { public void testXbibOrgWithVeryShortReadTimeout() throws IOException {
Client httpClient = Client.builder() Client httpClient = Client.builder()
.setReadTimeoutMillis(50)
.build(); .build();
try { try {
httpClient.execute(Request.get() httpClient.execute(Request.get()
.url("http://xbib.org") .url("http://xbib.org")
.setTimeoutInMillis(10)
.build() .build()
.setResponseListener(fullHttpResponse -> { .setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);

View file

@ -13,13 +13,14 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpVersion;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool; import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.pool.SimpleChannelPool; import org.xbib.netty.http.client.pool.BoundedChannelPool;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -34,6 +35,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@Ignore
public class EpollTest { public class EpollTest {
private static final Logger logger = Logger.getLogger(EpollTest.class.getName()); private static final Logger logger = Logger.getLogger(EpollTest.class.getName());
@ -74,7 +76,8 @@ public class EpollTest {
.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true); .option(ChannelOption.TCP_NODELAY, true);
channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0); channelPool = new BoundedChannelPool<>(semaphore, HttpVersion.HTTP_1_1,false,
NODES, bootstrap, null, 0);
channelPool.prepare(CONCURRENCY); channelPool.prepare(CONCURRENCY);
} }
@ -85,7 +88,6 @@ public class EpollTest {
mockEpollServer.close(); mockEpollServer.close();
} }
@Ignore
@Test @Test
public void testPoolEpoll() throws Exception { public void testPoolEpoll() throws Exception {
LongAdder longAdder = new LongAdder(); LongAdder longAdder = new LongAdder();

View file

@ -12,12 +12,13 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpVersion;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool; import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.pool.SimpleChannelPool; import org.xbib.netty.http.client.pool.BoundedChannelPool;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -72,7 +73,8 @@ public class NioTest {
.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true); .option(ChannelOption.TCP_NODELAY, true);
channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0); channelPool = new BoundedChannelPool<>(semaphore, HttpVersion.HTTP_1_1,false,
NODES, bootstrap, null, 0);
channelPool.prepare(CONCURRENCY); channelPool.prepare(CONCURRENCY);
} }

View file

@ -2,6 +2,14 @@ package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.BoundedChannelPool;
import org.xbib.netty.http.client.pool.Pool;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -17,21 +25,13 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import io.netty.util.AttributeKey;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.pool.SimpleChannelPool;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class SimplePoolTest { public class PoolTest {
private static final Logger logger = Logger.getLogger(SimplePoolTest.class.getName()); private static final Logger logger = Logger.getLogger(PoolTest.class.getName());
private static final int TEST_STEP_TIME_SECONDS = 50; private static final int TEST_STEP_TIME_SECONDS = 50;
@ -51,14 +51,14 @@ public class SimplePoolTest {
}); });
} }
public SimplePoolTest(int concurrencyLevel, int nodeCount) { public PoolTest(int concurrencyLevel, int nodeCount) {
this.nodeCount = nodeCount; this.nodeCount = nodeCount;
List<HttpAddress> nodes = new ArrayList<>(); List<HttpAddress> nodes = new ArrayList<>();
for (int i = 0; i < nodeCount; i ++) { for (int i = 0; i < nodeCount; i ++) {
nodes.add(HttpAddress.http1("localhost" + i)); nodes.add(HttpAddress.http1("localhost" + i));
} }
try (Pool<Channel> pool = new SimpleChannelPool<>(new Semaphore(concurrencyLevel), nodes, new Bootstrap(), try (Pool<Channel> pool = new BoundedChannelPool<>(new Semaphore(concurrencyLevel), HttpVersion.HTTP_1_1, false,
null, 0)) { nodes, new Bootstrap(), null, 0)) {
int n = Runtime.getRuntime().availableProcessors(); int n = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(n); ExecutorService executorService = Executors.newFixedThreadPool(n);
for(int i = 0; i < n; i ++) { for(int i = 0; i < n; i ++) {

View file

@ -3,7 +3,6 @@ package org.xbib.netty.http.client.test.retry;
import org.junit.Test; import org.junit.Test;
import org.xbib.netty.http.client.retry.BackOff; import org.xbib.netty.http.client.retry.BackOff;
import org.xbib.netty.http.client.retry.ExponentialBackOff; import org.xbib.netty.http.client.retry.ExponentialBackOff;
import org.xbib.netty.http.client.retry.NanoClock;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -136,7 +135,7 @@ public class ExponentialBackOffTest {
assertEquals(testMaxInterval, backOffPolicy.getCurrentIntervalMillis()); assertEquals(testMaxInterval, backOffPolicy.getCurrentIntervalMillis());
} }
static class MyNanoClock implements NanoClock { static class MyNanoClock implements ExponentialBackOff.NanoClock {
private int i = 0; private int i = 0;
private long startSeconds; private long startSeconds;

View file

@ -23,11 +23,13 @@ public class MockBackOff implements BackOff {
/** Number of tries so far. */ /** Number of tries so far. */
private int numTries; private int numTries;
public void reset() throws IOException { @Override
public void reset() {
numTries = 0; numTries = 0;
} }
public long nextBackOffMillis() throws IOException { @Override
public long nextBackOffMillis() {
if (numTries >= maxTries || backOffMillis == STOP) { if (numTries >= maxTries || backOffMillis == STOP) {
return STOP; return STOP;
} }

View file

@ -1,11 +1,9 @@
package org.xbib.netty.http.client.test.simple; package org.xbib.netty.http.client.test.simple;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
@ -20,11 +18,8 @@ import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -32,6 +27,7 @@ import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -46,17 +42,26 @@ public class SimpleHttp1Test {
private static final Logger logger = Logger.getLogger(SimpleHttp1Test.class.getName()); private static final Logger logger = Logger.getLogger(SimpleHttp1Test.class.getName());
@After
public void checkThreads() {
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
logger.log(Level.INFO, "threads = " + threadSet.size() );
threadSet.forEach( thread -> {
if (thread.getName().equals("ObjectCleanerThread")) {
logger.log(Level.INFO, thread.toString());
}
});
}
@Test @Test
public void testHttp1() throws Exception { public void testHttp1() throws Exception {
Client client = new Client(); Client client = new Client();
try { try {
HttpTransport transport = client.newTransport("fl.hbz-nrw.de", 80); HttpTransport transport = client.newTransport("xbib.org", 80);
transport.onResponse(string -> logger.log(Level.INFO, "got messsage: " + string)); transport.onResponse(string -> logger.log(Level.INFO, "got messsage: " + string));
transport.connect(); transport.connect();
transport.awaitSettings();
sendRequest(transport); sendRequest(transport);
transport.awaitResponses(); transport.awaitResponse();
transport.close();
} finally { } finally {
client.shutdown(); client.shutdown();
} }
@ -67,20 +72,18 @@ public class SimpleHttp1Test {
if (channel == null) { if (channel == null) {
return; return;
} }
Integer streamId = transport.nextStream();
String host = transport.inetSocketAddress().getHostString(); String host = transport.inetSocketAddress().getHostString();
int port = transport.inetSocketAddress().getPort(); int port = transport.inetSocketAddress().getPort();
String uri = "https://" + host + ":" + port; String uri = "http://" + host + ":" + port;
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
request.headers().add(HttpHeaderNames.HOST, host + ":" + port); request.headers().add(HttpHeaderNames.HOST, host + ":" + port);
request.headers().add(HttpHeaderNames.USER_AGENT, "Java"); request.headers().add(HttpHeaderNames.USER_AGENT, "Java");
request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.DEFLATE); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.DEFLATE);
if (streamId != null) {
request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
}
logger.log(Level.INFO, () -> "writing request = " + request); logger.log(Level.INFO, () -> "writing request = " + request);
channel.writeAndFlush(request); if (channel.isWritable()) {
channel.writeAndFlush(request);
}
} }
private AttributeKey<HttpTransport> TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport"); private AttributeKey<HttpTransport> TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport");
@ -115,16 +118,14 @@ public class SimpleHttp1Test {
return bootstrap; return bootstrap;
} }
Initializer initializer() {
return initializer;
}
HttpResponseHandler responseHandler() {
return httpResponseHandler;
}
void shutdown() { void shutdown() {
close();
eventLoopGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully();
try {
eventLoopGroup.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
} }
HttpTransport newTransport(String host, int port) { HttpTransport newTransport(String host, int port) {
@ -133,18 +134,11 @@ public class SimpleHttp1Test {
return transport; return transport;
} }
List<HttpTransport> transports() { synchronized void close() {
return transports;
}
void close(HttpTransport transport) {
transports.remove(transport);
}
void close() {
for (HttpTransport transport : transports) { for (HttpTransport transport : transports) {
transport.close(); transport.close();
} }
transports.clear();
} }
} }
@ -165,10 +159,6 @@ public class SimpleHttp1Test {
this.inetSocketAddress = inetSocketAddress; this.inetSocketAddress = inetSocketAddress;
} }
Client client() {
return client;
}
InetSocketAddress inetSocketAddress() { InetSocketAddress inetSocketAddress() {
return inetSocketAddress; return inetSocketAddress;
} }
@ -176,52 +166,33 @@ public class SimpleHttp1Test {
void connect() throws InterruptedException { void connect() throws InterruptedException {
channel = client.bootstrap().connect(inetSocketAddress).sync().await().channel(); channel = client.bootstrap().connect(inetSocketAddress).sync().await().channel();
channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this); channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
promise = new CompletableFuture<>();
} }
Channel channel() { Channel channel() {
return channel; return channel;
} }
Integer nextStream() {
promise = new CompletableFuture<>();
return null;
}
void onResponse(ResponseWriter responseWriter) { void onResponse(ResponseWriter responseWriter) {
this.responseWriter = responseWriter; this.responseWriter = responseWriter;
} }
void settingsReceived(Channel channel, Http2Settings http2Settings) { void responseReceived(FullHttpResponse msg) {
} if (responseWriter != null) {
responseWriter.write(msg.content().toString(StandardCharsets.UTF_8));
void awaitSettings() {
}
void responseReceived(Integer streamId, String message) {
if (promise == null) {
logger.log(Level.WARNING, "message received for unknown stream id " + streamId);
} else {
if (responseWriter != null) {
responseWriter.write(message);
}
} }
} }
void awaitResponse(Integer streamId) {
void awaitResponse() {
if (promise != null) { if (promise != null) {
try { try {
logger.log(Level.INFO, "waiting for response");
promise.get(5, TimeUnit.SECONDS); promise.get(5, TimeUnit.SECONDS);
logger.log(Level.INFO, "response received");
} catch (InterruptedException | ExecutionException | TimeoutException e) { } catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.log(Level.WARNING, e.getMessage(), e); logger.log(Level.WARNING, e.getMessage(), e);
} }
} }
} }
void awaitResponses() {
awaitResponse(null);
}
void complete() { void complete() {
if (promise != null) { if (promise != null) {
promise.complete(true); promise.complete(true);
@ -238,7 +209,6 @@ public class SimpleHttp1Test {
if (channel != null) { if (channel != null) {
channel.close(); channel.close();
} }
client.close(this);
} }
} }
@ -252,7 +222,6 @@ public class SimpleHttp1Test {
@Override @Override
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new TrafficLoggingHandler());
ch.pipeline().addLast(new HttpClientCodec()); ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1048576)); ch.pipeline().addLast(new HttpObjectAggregator(1048576));
ch.pipeline().addLast(httpResponseHandler); ch.pipeline().addLast(httpResponseHandler);
@ -265,7 +234,7 @@ public class SimpleHttp1Test {
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
HttpTransport transport = ctx.channel().attr(TRANSPORT_ATTRIBUTE_KEY).get(); HttpTransport transport = ctx.channel().attr(TRANSPORT_ATTRIBUTE_KEY).get();
if (msg.content().isReadable()) { if (msg.content().isReadable()) {
transport.responseReceived(null, msg.content().toString(StandardCharsets.UTF_8)); transport.responseReceived(msg);
} }
} }
@ -290,35 +259,4 @@ public class SimpleHttp1Test {
ctx.channel().close(); ctx.channel().close();
} }
} }
class TrafficLoggingHandler extends LoggingHandler {
TrafficLoggingHandler() {
super("client", LogLevel.INFO);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
ctx.fireChannelUnregistered();
}
@Override
public void flush(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
} }