add simple channel pool, add SSL provider (Conscrypt), fix ClientTest, add debug config
This commit is contained in:
parent
f25291a506
commit
047ae5bffd
14 changed files with 559 additions and 63 deletions
|
@ -34,10 +34,11 @@ configurations {
|
|||
}
|
||||
|
||||
dependencies {
|
||||
compile "org.xbib:net-url:${project.property('xbib-net-url.version')}"
|
||||
compile "io.netty:netty-codec-http2:${project.property('netty.version')}"
|
||||
compile "io.netty:netty-handler-proxy:${project.property('netty.version')}"
|
||||
compile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
|
||||
compile "org.xbib:net-url:${project.property('xbib-net-url.version')}"
|
||||
testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
|
||||
testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}"
|
||||
testCompile "junit:junit:${project.property('junit.version')}"
|
||||
testCompile "com.fasterxml.jackson.core:jackson-databind:${project.property('jackson.version')}"
|
||||
alpnagent "org.mortbay.jetty.alpn:jetty-alpn-agent:${project.property('alpnagent.version')}"
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
group = org.xbib
|
||||
name = netty-http-client
|
||||
version = 4.1.19.0
|
||||
version = 4.1.19.1
|
||||
|
||||
netty.version = 4.1.19.Final
|
||||
tcnative.version = 2.0.1.Final
|
||||
tcnative.version = 2.0.7.Final
|
||||
conscrypt.version = 1.0.1
|
||||
xbib-net-url.version = 1.1.0
|
||||
alpnagent.version = 2.0.7
|
||||
junit.version = 4.12
|
||||
|
|
|
@ -100,6 +100,10 @@ public final class Client {
|
|||
return new ClientBuilder();
|
||||
}
|
||||
|
||||
public ClientConfig getClientConfig() {
|
||||
return clientConfig;
|
||||
}
|
||||
|
||||
public ByteBufAllocator getByteBufAllocator() {
|
||||
return byteBufAllocator;
|
||||
}
|
||||
|
@ -145,6 +149,17 @@ public final class Client {
|
|||
.connect(httpAddress.getInetSocketAddress()).sync().await().channel();
|
||||
}
|
||||
|
||||
public Transport execute(Request request) {
|
||||
Transport nextTransport = newTransport(HttpAddress.of(request));
|
||||
nextTransport.execute(request);
|
||||
return nextTransport;
|
||||
}
|
||||
|
||||
public <T> CompletableFuture<T> execute(Request request,
|
||||
Function<FullHttpResponse, T> supplier) {
|
||||
return newTransport(HttpAddress.of(request)).execute(request, supplier);
|
||||
}
|
||||
|
||||
/**
|
||||
* For following redirects by a chain of transports.
|
||||
* @param transport the previous transport
|
||||
|
@ -163,16 +178,6 @@ public final class Client {
|
|||
close(nextTransport);
|
||||
}
|
||||
|
||||
public Transport execute(Request request) {
|
||||
Transport nextTransport = newTransport(HttpAddress.of(request));
|
||||
nextTransport.execute(request);
|
||||
return nextTransport;
|
||||
}
|
||||
|
||||
public <T> CompletableFuture<T> execute(Request request,
|
||||
Function<FullHttpResponse, T> supplier) {
|
||||
return newTransport(HttpAddress.of(request)).execute(request, supplier);
|
||||
}
|
||||
|
||||
public Transport prepareRequest(Request request) {
|
||||
return newTransport(HttpAddress.of(request));
|
||||
|
|
|
@ -9,6 +9,7 @@ import io.netty.handler.ssl.SslProvider;
|
|||
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import java.io.InputStream;
|
||||
import java.security.Provider;
|
||||
|
||||
public class ClientBuilder {
|
||||
|
||||
|
@ -24,6 +25,16 @@ public class ClientBuilder {
|
|||
this.clientConfig = new ClientConfig();
|
||||
}
|
||||
|
||||
public ClientBuilder enableDebug() {
|
||||
clientConfig.enableDebug();
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClientBuilder disableDebug() {
|
||||
clientConfig.disableDebug();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set byte buf allocator for payload in HTTP requests.
|
||||
* @param byteBufAllocator the byte buf allocator
|
||||
|
@ -104,11 +115,6 @@ public class ClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ClientBuilder setMaxConnections(int maxConnections) {
|
||||
clientConfig.setMaxConnections(maxConnections);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClientBuilder setReadTimeoutMillis(int readTimeoutMillis) {
|
||||
clientConfig.setReadTimeoutMillis(readTimeoutMillis);
|
||||
return this;
|
||||
|
@ -134,6 +140,11 @@ public class ClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ClientBuilder setSslContextProvider(Provider provider) {
|
||||
clientConfig.setSslContextProvider(provider);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClientBuilder setCiphers(Iterable<String> ciphers) {
|
||||
clientConfig.setCiphers(ciphers);
|
||||
return this;
|
||||
|
|
|
@ -10,11 +10,17 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
|||
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import java.io.InputStream;
|
||||
import java.security.Provider;
|
||||
|
||||
public class ClientConfig {
|
||||
|
||||
interface Defaults {
|
||||
|
||||
/**
|
||||
* If frame logging /traffic logging is enabled or not.
|
||||
*/
|
||||
boolean DEBUG = false;
|
||||
|
||||
/**
|
||||
* Default for thread count.
|
||||
*/
|
||||
|
@ -74,12 +80,6 @@ public class ClientConfig {
|
|||
*/
|
||||
int MAX_COMPOSITE_BUFFER_COMPONENTS = 1024;
|
||||
|
||||
/**
|
||||
* Allow maximum concurrent connections.
|
||||
* Usually, browsers restrict concurrent connections to 8 for a single address.
|
||||
*/
|
||||
int MAX_CONNECTIONS = 8;
|
||||
|
||||
/**
|
||||
* Default read/write timeout in milliseconds.
|
||||
*/
|
||||
|
@ -93,11 +93,15 @@ public class ClientConfig {
|
|||
/**
|
||||
* Default SSL provider.
|
||||
*/
|
||||
SslProvider SSL_PROVIDER = OpenSsl.isAvailable() && OpenSsl.isAlpnSupported() ?
|
||||
SslProvider.OPENSSL : SslProvider.JDK;
|
||||
SslProvider SSL_PROVIDER = SslProvider.JDK;
|
||||
|
||||
/**
|
||||
* Default ciphers.
|
||||
* Default SSL context provider (for JDK SSL only).
|
||||
*/
|
||||
Provider SSL_CONTEXT_PROVIDER = null;
|
||||
|
||||
/**
|
||||
* Default ciphers. We care about HTTP/2.
|
||||
*/
|
||||
Iterable<String> CIPHERS = Http2SecurityUtil.CIPHERS;
|
||||
|
||||
|
@ -119,6 +123,7 @@ public class ClientConfig {
|
|||
ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE;
|
||||
}
|
||||
|
||||
private boolean debug = Defaults.DEBUG;
|
||||
|
||||
/**
|
||||
* If set to 0, then Netty will decide about thread count.
|
||||
|
@ -142,8 +147,6 @@ public class ClientConfig {
|
|||
|
||||
private int maxChunkSize = Defaults.MAX_CHUNK_SIZE;
|
||||
|
||||
private int maxConnections = Defaults.MAX_CONNECTIONS;
|
||||
|
||||
private int maxContentLength = Defaults.MAX_CONTENT_LENGTH;
|
||||
|
||||
private int maxCompositeBufferComponents = Defaults.MAX_COMPOSITE_BUFFER_COMPONENTS;
|
||||
|
@ -156,6 +159,8 @@ public class ClientConfig {
|
|||
|
||||
private SslProvider sslProvider = Defaults.SSL_PROVIDER;
|
||||
|
||||
private Provider sslContextProvider = Defaults.SSL_CONTEXT_PROVIDER;
|
||||
|
||||
private Iterable<String> ciphers = Defaults.CIPHERS;
|
||||
|
||||
private CipherSuiteFilter cipherSuiteFilter = Defaults.CIPHER_SUITE_FILTER;
|
||||
|
@ -174,6 +179,25 @@ public class ClientConfig {
|
|||
|
||||
private HttpProxyHandler httpProxyHandler;
|
||||
|
||||
public ClientConfig setDebug(boolean debug) {
|
||||
this.debug = debug;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClientConfig enableDebug() {
|
||||
this.debug = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClientConfig disableDebug() {
|
||||
this.debug = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isDebug() {
|
||||
return debug;
|
||||
}
|
||||
|
||||
public ClientConfig setThreadCount(int threadCount) {
|
||||
this.threadCount = threadCount;
|
||||
return this;
|
||||
|
@ -255,15 +279,6 @@ public class ClientConfig {
|
|||
return maxChunkSize;
|
||||
}
|
||||
|
||||
public ClientConfig setMaxConnections(int maxConnections) {
|
||||
this.maxConnections = maxConnections;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxConnections() {
|
||||
return maxConnections;
|
||||
}
|
||||
|
||||
public ClientConfig setMaxContentLength(int maxContentLength) {
|
||||
this.maxContentLength = maxContentLength;
|
||||
return this;
|
||||
|
@ -328,6 +343,15 @@ public class ClientConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ClientConfig setSslContextProvider(Provider sslContextProvider) {
|
||||
this.sslContextProvider = sslContextProvider;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Provider getSslContextProvider() {
|
||||
return sslContextProvider;
|
||||
}
|
||||
|
||||
public ClientConfig setCiphers(Iterable<String> ciphers) {
|
||||
this.ciphers = ciphers;
|
||||
return this;
|
||||
|
@ -407,4 +431,14 @@ public class ClientConfig {
|
|||
public HttpProxyHandler getHttpProxyHandler() {
|
||||
return httpProxyHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("SSL=").append(sslProvider)
|
||||
.append(",SSL context provider=").append(sslContextProvider != null ? sslContextProvider.getName() : "<none>");
|
||||
return sb.toString();
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,10 @@ import io.netty.channel.socket.SocketChannel;
|
|||
import io.netty.handler.codec.http.HttpClientCodec;
|
||||
import io.netty.handler.codec.http.HttpContentDecompressor;
|
||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import io.netty.handler.ssl.SslHandler;
|
||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||
import org.xbib.netty.http.client.ClientConfig;
|
||||
import org.xbib.netty.http.client.HttpAddress;
|
||||
import org.xbib.netty.http.client.handler.TrafficLoggingHandler;
|
||||
|
@ -34,7 +36,9 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
if (clientConfig.isDebug()) {
|
||||
ch.pipeline().addLast(new TrafficLoggingHandler());
|
||||
}
|
||||
if (httpAddress.isSecure()) {
|
||||
configureEncryptedHttp1(ch);
|
||||
} else {
|
||||
|
@ -47,11 +51,13 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|||
try {
|
||||
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
|
||||
.sslProvider(clientConfig.getSslProvider())
|
||||
.sslContextProvider(clientConfig.getSslContextProvider())
|
||||
.keyManager(clientConfig.getKeyCertChainInputStream(), clientConfig.getKeyInputStream(),
|
||||
clientConfig.getKeyPassword())
|
||||
.ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter())
|
||||
.trustManager(clientConfig.getTrustManagerFactory());
|
||||
SslHandler sslHandler = sslContextBuilder.build().newHandler(ch.alloc());
|
||||
SslContext sslContext = sslContextBuilder.build();
|
||||
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
|
||||
SSLEngine engine = sslHandler.engine();
|
||||
if (clientConfig.isServerNameIdentification()) {
|
||||
String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName();
|
||||
|
@ -87,6 +93,10 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|||
false);
|
||||
httpObjectAggregator.setMaxCumulationBufferComponents(clientConfig.getMaxCompositeBufferComponents());
|
||||
pipeline.addLast(httpObjectAggregator);
|
||||
/*if (clientConfig.isEnableGzip()) {
|
||||
pipeline.addLast(new HttpChunkContentCompressor(6));
|
||||
}
|
||||
pipeline.addLast(new ChunkedWriteHandler());*/
|
||||
pipeline.addLast(httpResponseHandler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package org.xbib.netty.http.client.handler.http1;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.http.DefaultHttpContent;
|
||||
import io.netty.handler.codec.http.HttpContentCompressor;
|
||||
|
||||
/**
|
||||
* Be sure you place the HttpChunkContentCompressor before the ChunkedWriteHandler.
|
||||
*/
|
||||
public class HttpChunkContentCompressor extends HttpContentCompressor {
|
||||
|
||||
HttpChunkContentCompressor(int compressionLevel) {
|
||||
super(compressionLevel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf byteBuf = (ByteBuf) msg;
|
||||
if (byteBuf.isReadable()) {
|
||||
msg = new DefaultHttpContent(byteBuf);
|
||||
}
|
||||
}
|
||||
super.write(ctx, msg, promise);
|
||||
}
|
||||
}
|
|
@ -17,7 +17,6 @@ import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
|
|||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import io.netty.handler.ssl.SslHandler;
|
||||
import io.netty.handler.ssl.SslProvider;
|
||||
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
|
||||
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
||||
import org.xbib.netty.http.client.ClientConfig;
|
||||
|
@ -62,28 +61,31 @@ public class Http2ChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|||
@Override
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
DefaultHttp2Connection http2Connection = new DefaultHttp2Connection(false);
|
||||
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.INFO, "client");
|
||||
Http2ConnectionHandler http2ConnectionHandler = new HttpToHttp2ConnectionHandlerBuilder()
|
||||
HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder()
|
||||
.connection(http2Connection)
|
||||
.frameLogger(frameLogger)
|
||||
.frameListener(new DelegatingDecompressorFrameListener(http2Connection,
|
||||
new InboundHttp2ToHttpAdapterBuilder(http2Connection)
|
||||
.maxContentLength(clientConfig.getMaxContentLength())
|
||||
.propagateSettings(true)
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
.build()));
|
||||
if (clientConfig.isDebug()) {
|
||||
http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client"));
|
||||
}
|
||||
Http2ConnectionHandler http2ConnectionHandler = http2ConnectionHandlerBuilder.build();
|
||||
try {
|
||||
SslContext sslContext = SslContextBuilder.forClient()
|
||||
.sslProvider(SslProvider.JDK)
|
||||
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
|
||||
.sslProvider(clientConfig.getSslProvider())
|
||||
.trustManager(InsecureTrustManagerFactory.INSTANCE)
|
||||
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
|
||||
.applicationProtocolConfig(new ApplicationProtocolConfig(
|
||||
ApplicationProtocolConfig.Protocol.ALPN,
|
||||
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
|
||||
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
|
||||
ApplicationProtocolNames.HTTP_2))
|
||||
.build();
|
||||
ApplicationProtocolNames.HTTP_2));
|
||||
if (clientConfig.getSslContextProvider() != null) {
|
||||
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
|
||||
}
|
||||
SslContext sslContext = sslContextBuilder.build();
|
||||
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
|
||||
SSLEngine engine = sslHandler.engine();
|
||||
if (clientConfig.isServerNameIdentification()) {
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package org.xbib.netty.http.client.pool;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.util.AttributeKey;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.net.ConnectException;
|
||||
import java.util.List;
|
||||
|
||||
public interface ChannelPool extends Closeable {
|
||||
|
||||
AttributeKey<String> NODE_ATTRIBUTE_KEY = AttributeKey.valueOf("node");
|
||||
|
||||
void prepare(int count) throws ConnectException;
|
||||
|
||||
Channel lease() throws ConnectException;
|
||||
|
||||
int lease(List<Channel> channels, int maxCount) throws ConnectException;
|
||||
|
||||
void release(Channel channel);
|
||||
|
||||
void release(List<Channel> channels);
|
||||
}
|
|
@ -0,0 +1,340 @@
|
|||
package org.xbib.netty.http.client.pool;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.pool.ChannelPoolHandler;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
|
||||
public class SimpleChannelPool implements ChannelPool {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(SimpleChannelPool.class.getName());
|
||||
|
||||
private final Semaphore semaphore;
|
||||
|
||||
private final List<String> nodes;
|
||||
|
||||
private final int numberOfNodes;
|
||||
|
||||
private final int retriesPerNode;
|
||||
|
||||
private final Map<String, Bootstrap> bootstraps;
|
||||
|
||||
private final Map<String, List<Channel>> channels;
|
||||
|
||||
private final Map<String, Queue<Channel>> availableChannels;
|
||||
|
||||
private final Map<String, Integer> counts;
|
||||
|
||||
private final Map<String, Integer> failedCounts;
|
||||
|
||||
private final Lock lock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
* @param semaphore the throttle for the concurrency level control
|
||||
* @param nodes the endpoint nodes, any element may contain the port (followed after ":")
|
||||
* to override the defaultPort argument
|
||||
* @param bootstrap bootstrap instance
|
||||
* @param channelPoolHandler channel pool handler being notified upon new connection is created
|
||||
* @param defaultPort default port used to connect (any node address from the nodes set may override this)
|
||||
* @param retriesPerNode the max count of the subsequent connection failures to the node before
|
||||
* the node will be excluded from the pool, 0 means no limit
|
||||
*/
|
||||
public SimpleChannelPool(Semaphore semaphore, List<String> nodes, Bootstrap bootstrap,
|
||||
ChannelPoolHandler channelPoolHandler, int defaultPort, int retriesPerNode) {
|
||||
this.semaphore = semaphore;
|
||||
if (nodes == null || nodes.isEmpty()) {
|
||||
throw new IllegalArgumentException("empty nodes array argument");
|
||||
}
|
||||
this.nodes = nodes;
|
||||
this.retriesPerNode = retriesPerNode;
|
||||
this.numberOfNodes = nodes.size();
|
||||
bootstraps = new HashMap<>(numberOfNodes);
|
||||
channels = new HashMap<>(numberOfNodes);
|
||||
availableChannels = new HashMap<>(numberOfNodes);
|
||||
counts = new HashMap<>(numberOfNodes);
|
||||
failedCounts = new HashMap<>(numberOfNodes);
|
||||
for (String node : nodes) {
|
||||
InetSocketAddress nodeAddr;
|
||||
if (node.contains(":")) {
|
||||
String addrParts[] = node.split(":");
|
||||
nodeAddr = new InetSocketAddress(addrParts[0], Integer.parseInt(addrParts[1]));
|
||||
} else {
|
||||
nodeAddr = new InetSocketAddress(node, defaultPort);
|
||||
}
|
||||
bootstraps.put(node, bootstrap.clone().remoteAddress(nodeAddr)
|
||||
.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel conn) throws Exception {
|
||||
if(!conn.eventLoop().inEventLoop()) {
|
||||
throw new AssertionError();
|
||||
}
|
||||
channelPoolHandler.channelCreated(conn);
|
||||
}
|
||||
}));
|
||||
availableChannels.put(node, new ConcurrentLinkedQueue<>());
|
||||
counts.put(node, 0);
|
||||
failedCounts.put(node, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(int count) throws ConnectException {
|
||||
if (count > 0) {
|
||||
for (int i = 0; i < count; i ++) {
|
||||
Channel channel = connectToAnyNode();
|
||||
if(channel == null) {
|
||||
throw new ConnectException("Failed to pre-create the connections to the target nodes");
|
||||
}
|
||||
String nodeAddr = channel.attr(NODE_ATTRIBUTE_KEY).get();
|
||||
if (channel.isActive()) {
|
||||
Queue<Channel> channelQueue = availableChannels.get(nodeAddr);
|
||||
if (channelQueue != null) {
|
||||
channelQueue.add(channel);
|
||||
}
|
||||
} else {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
logger.info("prepared " + count + " connections");
|
||||
} else {
|
||||
throw new IllegalArgumentException("Connection count should be > 0, but got " + count);
|
||||
}
|
||||
}
|
||||
|
||||
private class CloseChannelListener implements ChannelFutureListener {
|
||||
|
||||
private final String nodeAddr;
|
||||
private final Channel conn;
|
||||
|
||||
private CloseChannelListener(String nodeAddr, Channel conn) {
|
||||
this.nodeAddr = nodeAddr;
|
||||
this.conn = conn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
logger.fine("connection to " + nodeAddr + " closed");
|
||||
lock.lock();
|
||||
try {
|
||||
synchronized (counts) {
|
||||
if(counts.containsKey(nodeAddr)) {
|
||||
counts.put(nodeAddr, counts.get(nodeAddr) - 1);
|
||||
}
|
||||
}
|
||||
synchronized (channels) {
|
||||
List<Channel> nodeConns = channels.get(nodeAddr);
|
||||
if(nodeConns != null) {
|
||||
nodeConns.remove(conn);
|
||||
}
|
||||
}
|
||||
semaphore.release();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Channel connectToAnyNode() throws ConnectException {
|
||||
Channel channel = null;
|
||||
String nodeAddr = null;
|
||||
String nextNodeAddr;
|
||||
int min = Integer.MAX_VALUE;
|
||||
int next;
|
||||
int i = ThreadLocalRandom.current().nextInt(numberOfNodes);
|
||||
for (int j = i; j < numberOfNodes; j ++) {
|
||||
nextNodeAddr = nodes.get(j % numberOfNodes);
|
||||
next = counts.get(nextNodeAddr);
|
||||
if(next == 0) {
|
||||
nodeAddr = nextNodeAddr;
|
||||
break;
|
||||
} else if (next < min) {
|
||||
min = next;
|
||||
nodeAddr = nextNodeAddr;
|
||||
}
|
||||
}
|
||||
if (nodeAddr != null) {
|
||||
logger.fine("trying connection to " + nodeAddr);
|
||||
try {
|
||||
channel = connect(nodeAddr);
|
||||
} catch (Exception e) {
|
||||
logger.warning("failed to create a new connection to " + nodeAddr + ": " + e.toString());
|
||||
if (retriesPerNode > 0) {
|
||||
int selectedNodeFailedConnAttemptsCount = failedCounts.get(nodeAddr) + 1;
|
||||
failedCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount);
|
||||
if (selectedNodeFailedConnAttemptsCount > retriesPerNode) {
|
||||
logger.warning("Failed to connect to the node \"" + nodeAddr + "\" "
|
||||
+ selectedNodeFailedConnAttemptsCount + " times successively, "
|
||||
+ "excluding the node from the connection pool forever");
|
||||
counts.put(nodeAddr, Integer.MAX_VALUE);
|
||||
boolean allNodesExcluded = true;
|
||||
for (String node : nodes) {
|
||||
if (counts.get(node) < Integer.MAX_VALUE) {
|
||||
allNodesExcluded = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (allNodesExcluded) {
|
||||
logger.severe("no endpoint nodes left in the connection pool");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (e instanceof ConnectException) {
|
||||
throw (ConnectException) e;
|
||||
} else {
|
||||
throw new ConnectException(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (channel != null) {
|
||||
channel.closeFuture().addListener(new CloseChannelListener(nodeAddr, channel));
|
||||
channel.attr(NODE_ATTRIBUTE_KEY).set(nodeAddr);
|
||||
channels.computeIfAbsent(nodeAddr, na -> new ArrayList<>()).add(channel);
|
||||
synchronized(counts) {
|
||||
counts.put(nodeAddr, counts.get(nodeAddr) + 1);
|
||||
}
|
||||
if(retriesPerNode > 0) {
|
||||
failedCounts.put(nodeAddr, 0);
|
||||
}
|
||||
logger.fine("new connection to " + nodeAddr + " created");
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
|
||||
protected Channel connect(String addr) throws Exception {
|
||||
Bootstrap bootstrap = bootstraps.get(addr);
|
||||
if (bootstrap != null) {
|
||||
return bootstrap.connect().sync().channel();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Channel poll() {
|
||||
int i = ThreadLocalRandom.current().nextInt(numberOfNodes);
|
||||
Queue<Channel> channelQueue;
|
||||
Channel channel;
|
||||
for(int j = i; j < i + numberOfNodes; j ++) {
|
||||
channelQueue = availableChannels.get(nodes.get(j % numberOfNodes));
|
||||
if(channelQueue != null) {
|
||||
channel = channelQueue.poll();
|
||||
if(channel != null && channel.isActive()) {
|
||||
return channel;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel lease() throws ConnectException {
|
||||
Channel conn = null;
|
||||
if (semaphore.tryAcquire()) {
|
||||
if (null == (conn = poll())) {
|
||||
conn = connectToAnyNode();
|
||||
}
|
||||
if (conn == null) {
|
||||
semaphore.release();
|
||||
throw new ConnectException();
|
||||
}
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lease(List<Channel> channels, int maxCount) throws ConnectException {
|
||||
int availableCount = semaphore.drainPermits();
|
||||
if (availableCount == 0) {
|
||||
return availableCount;
|
||||
}
|
||||
if (availableCount > maxCount) {
|
||||
semaphore.release(availableCount - maxCount);
|
||||
availableCount = maxCount;
|
||||
}
|
||||
Channel conn;
|
||||
for (int i = 0; i < availableCount; i ++) {
|
||||
if (null == (conn = poll())) {
|
||||
conn = connectToAnyNode();
|
||||
}
|
||||
if (conn == null) {
|
||||
semaphore.release(availableCount - i);
|
||||
throw new ConnectException();
|
||||
} else {
|
||||
channels.add(conn);
|
||||
}
|
||||
}
|
||||
return availableCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void release(Channel conn) {
|
||||
String nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get();
|
||||
if( conn.isActive()) {
|
||||
Queue<Channel> connQueue = availableChannels.get(nodeAddr);
|
||||
if (connQueue != null) {
|
||||
connQueue.add(conn);
|
||||
}
|
||||
semaphore.release();
|
||||
} else {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void release(List<Channel> conns) {
|
||||
String nodeAddr;
|
||||
Queue<Channel> connQueue;
|
||||
for (Channel conn : conns) {
|
||||
nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get();
|
||||
if (conn.isActive()) {
|
||||
connQueue = availableChannels.get(nodeAddr);
|
||||
connQueue.add(conn);
|
||||
semaphore.release();
|
||||
} else {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
lock.lock();
|
||||
int closedConnCount = 0;
|
||||
for (String nodeAddr: availableChannels.keySet()) {
|
||||
for (Channel conn: availableChannels.get(nodeAddr)) {
|
||||
if (conn.isOpen()) {
|
||||
conn.close();
|
||||
closedConnCount ++;
|
||||
}
|
||||
}
|
||||
}
|
||||
availableChannels.clear();
|
||||
for (String nodeAddr: channels.keySet()) {
|
||||
for (Channel conn: channels.get(nodeAddr)) {
|
||||
if (conn.isOpen()) {
|
||||
conn.close();
|
||||
closedConnCount ++;
|
||||
}
|
||||
}
|
||||
}
|
||||
channels.clear();
|
||||
bootstraps.clear();
|
||||
counts.clear();
|
||||
logger.fine("closed " + closedConnCount + " connections");
|
||||
}
|
||||
}
|
|
@ -107,6 +107,7 @@ abstract class BaseTransport implements Transport {
|
|||
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) :
|
||||
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri,
|
||||
request.content());
|
||||
logger.log(Level.INFO, fullHttpRequest.toString());
|
||||
Integer streamId = nextStream();
|
||||
if (streamId != null) {
|
||||
request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
|
||||
|
|
|
@ -16,12 +16,15 @@ public class AkamaiTest extends LoggingBase {
|
|||
private static final Logger logger = Logger.getLogger("");
|
||||
|
||||
/**
|
||||
* h2_demo_frame.html fails with:
|
||||
* 2018-02-27 23:43:32.048 INFORMATION [client] io.netty.handler.codec.http2.Http2FrameLogger
|
||||
* logRstStream [id: 0x4fe29f1e, L:/192.168.178.23:49429 - R:http2.akamai.com/104.94.191.203:443]
|
||||
* INBOUND RST_STREAM: streamId=2 errorCode=8
|
||||
* 2018-02-27 23:43:32.049 SCHWERWIEGEND [] org.xbib.netty.http.client.test.a.AkamaiTest lambda$testAkamaiHttps$0
|
||||
* HTTP/2 to HTTP layer caught stream reset
|
||||
* io.netty.handler.codec.http2.Http2Exception$StreamException: HTTP/2 to HTTP layer caught stream reset
|
||||
*
|
||||
* TODO(jprante) catch all promised pushes
|
||||
*/
|
||||
@Test
|
||||
public void testAkamaiHttps() {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package org.xbib.netty.http.client.test;
|
||||
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.xbib.netty.http.client.Client;
|
||||
import org.xbib.netty.http.client.HttpAddress;
|
||||
|
@ -17,7 +16,6 @@ public class ClientTest {
|
|||
private static final Logger logger = Logger.getLogger(ClientTest.class.getName());
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testHttp1() throws Exception {
|
||||
Client client = new Client();
|
||||
try {
|
||||
|
@ -37,7 +35,6 @@ public class ClientTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testHttp1ParallelRequests() {
|
||||
Client client = new Client();
|
||||
try {
|
||||
|
@ -65,7 +62,6 @@ public class ClientTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testHttp2() throws Exception {
|
||||
String host = "webtide.com";
|
||||
Client client = new Client();
|
||||
|
@ -74,11 +70,12 @@ public class ClientTest {
|
|||
Transport transport = client.newTransport(HttpAddress.http2(host));
|
||||
transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
|
||||
msg.headers().entries() +
|
||||
msg.content().toString(StandardCharsets.UTF_8) +
|
||||
//msg.content().toString(StandardCharsets.UTF_8) +
|
||||
" status=" + msg.status().code()));
|
||||
transport.setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " +
|
||||
msg.headers().entries() +
|
||||
msg.content().toString(StandardCharsets.UTF_8)));
|
||||
//msg.content().toString(StandardCharsets.UTF_8) +
|
||||
" status=" + msg.status().code()));
|
||||
transport.connect();
|
||||
transport.awaitSettings();
|
||||
simpleRequest(transport);
|
||||
|
@ -115,7 +112,6 @@ public class ClientTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testHttp2TwoRequestsOnSameConnection() {
|
||||
Client client = new Client();
|
||||
try {
|
||||
|
@ -151,9 +147,8 @@ public class ClientTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testMixed() throws Exception {
|
||||
Client client = new Client();
|
||||
public void testTwoTransports() throws Exception {
|
||||
Client client = Client.builder().enableDebug().build();
|
||||
try {
|
||||
Transport transport = client.newTransport(HttpAddress.http1("xbib.org"));
|
||||
transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
|
||||
|
@ -178,7 +173,9 @@ public class ClientTest {
|
|||
}
|
||||
|
||||
private void simpleRequest(Transport transport) {
|
||||
transport.execute(Request.builder(HttpMethod.GET).setURL(transport.httpAddress().base()).build());
|
||||
transport.execute(Request.builder(HttpMethod.GET)
|
||||
.setVersion(transport.httpAddress().getVersion())
|
||||
.setURL(transport.httpAddress().base()).build());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package org.xbib.netty.http.client.test;
|
||||
|
||||
import org.conscrypt.Conscrypt;
|
||||
import org.junit.Test;
|
||||
import org.xbib.netty.http.client.Client;
|
||||
import org.xbib.netty.http.client.Request;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class ConscryptTest extends LoggingBase {
|
||||
|
||||
private static final Logger logger = Logger.getLogger("");
|
||||
|
||||
@Test
|
||||
public void testConscrypt() {
|
||||
Client client = Client.builder()
|
||||
.enableDebug()
|
||||
.setJdkSslProvider()
|
||||
.setSslContextProvider(Conscrypt.newProvider())
|
||||
.build();
|
||||
logger.log(Level.INFO, client.getClientConfig().toString());
|
||||
try {
|
||||
Request request = Request.get()
|
||||
.setURL("https://fl-test.hbz-nrw.de")
|
||||
.setVersion("HTTP/2.0")
|
||||
.build()
|
||||
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))
|
||||
.setResponseListener(fullHttpResponse -> {
|
||||
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
|
||||
logger.log(Level.INFO, "status = " + fullHttpResponse.status()
|
||||
+ " response body = " + response);
|
||||
});
|
||||
client.execute(request).get();
|
||||
} finally {
|
||||
client.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue