From 047ae5bffd39b294f7f4355d8769f9b7f5570540 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Thu, 1 Mar 2018 22:08:16 +0100 Subject: [PATCH] add simple channel pool, add SSL provider (Conscrypt), fix ClientTest, add debug config --- build.gradle | 5 +- gradle.properties | 5 +- .../org/xbib/netty/http/client/Client.java | 25 +- .../xbib/netty/http/client/ClientBuilder.java | 21 +- .../xbib/netty/http/client/ClientConfig.java | 74 ++-- .../handler/http1/HttpChannelInitializer.java | 14 +- .../http1/HttpChunkContentCompressor.java | 28 ++ .../http2/Http2ChannelInitializer.java | 24 +- .../netty/http/client/pool/ChannelPool.java | 23 ++ .../http/client/pool/SimpleChannelPool.java | 340 ++++++++++++++++++ .../http/client/transport/BaseTransport.java | 1 + .../netty/http/client/test/AkamaiTest.java | 3 + .../netty/http/client/test/ClientTest.java | 19 +- .../netty/http/client/test/ConscryptTest.java | 40 +++ 14 files changed, 559 insertions(+), 63 deletions(-) create mode 100644 src/main/java/org/xbib/netty/http/client/handler/http1/HttpChunkContentCompressor.java create mode 100644 src/main/java/org/xbib/netty/http/client/pool/ChannelPool.java create mode 100644 src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java create mode 100644 src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java diff --git a/build.gradle b/build.gradle index b18f2a1..7615225 100644 --- a/build.gradle +++ b/build.gradle @@ -34,10 +34,11 @@ configurations { } dependencies { + compile "org.xbib:net-url:${project.property('xbib-net-url.version')}" compile "io.netty:netty-codec-http2:${project.property('netty.version')}" compile "io.netty:netty-handler-proxy:${project.property('netty.version')}" - compile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" - compile "org.xbib:net-url:${project.property('xbib-net-url.version')}" + testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" + testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}" testCompile "junit:junit:${project.property('junit.version')}" testCompile "com.fasterxml.jackson.core:jackson-databind:${project.property('jackson.version')}" alpnagent "org.mortbay.jetty.alpn:jetty-alpn-agent:${project.property('alpnagent.version')}" diff --git a/gradle.properties b/gradle.properties index d642e95..3885d55 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,9 +1,10 @@ group = org.xbib name = netty-http-client -version = 4.1.19.0 +version = 4.1.19.1 netty.version = 4.1.19.Final -tcnative.version = 2.0.1.Final +tcnative.version = 2.0.7.Final +conscrypt.version = 1.0.1 xbib-net-url.version = 1.1.0 alpnagent.version = 2.0.7 junit.version = 4.12 diff --git a/src/main/java/org/xbib/netty/http/client/Client.java b/src/main/java/org/xbib/netty/http/client/Client.java index 2324f39..30579f2 100644 --- a/src/main/java/org/xbib/netty/http/client/Client.java +++ b/src/main/java/org/xbib/netty/http/client/Client.java @@ -100,6 +100,10 @@ public final class Client { return new ClientBuilder(); } + public ClientConfig getClientConfig() { + return clientConfig; + } + public ByteBufAllocator getByteBufAllocator() { return byteBufAllocator; } @@ -145,6 +149,17 @@ public final class Client { .connect(httpAddress.getInetSocketAddress()).sync().await().channel(); } + public Transport execute(Request request) { + Transport nextTransport = newTransport(HttpAddress.of(request)); + nextTransport.execute(request); + return nextTransport; + } + + public CompletableFuture execute(Request request, + Function supplier) { + return newTransport(HttpAddress.of(request)).execute(request, supplier); + } + /** * For following redirects by a chain of transports. * @param transport the previous transport @@ -163,16 +178,6 @@ public final class Client { close(nextTransport); } - public Transport execute(Request request) { - Transport nextTransport = newTransport(HttpAddress.of(request)); - nextTransport.execute(request); - return nextTransport; - } - - public CompletableFuture execute(Request request, - Function supplier) { - return newTransport(HttpAddress.of(request)).execute(request, supplier); - } public Transport prepareRequest(Request request) { return newTransport(HttpAddress.of(request)); diff --git a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java index d0e3bcf..8727167 100644 --- a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java +++ b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java @@ -9,6 +9,7 @@ import io.netty.handler.ssl.SslProvider; import javax.net.ssl.TrustManagerFactory; import java.io.InputStream; +import java.security.Provider; public class ClientBuilder { @@ -24,6 +25,16 @@ public class ClientBuilder { this.clientConfig = new ClientConfig(); } + public ClientBuilder enableDebug() { + clientConfig.enableDebug(); + return this; + } + + public ClientBuilder disableDebug() { + clientConfig.disableDebug(); + return this; + } + /** * Set byte buf allocator for payload in HTTP requests. * @param byteBufAllocator the byte buf allocator @@ -104,11 +115,6 @@ public class ClientBuilder { return this; } - public ClientBuilder setMaxConnections(int maxConnections) { - clientConfig.setMaxConnections(maxConnections); - return this; - } - public ClientBuilder setReadTimeoutMillis(int readTimeoutMillis) { clientConfig.setReadTimeoutMillis(readTimeoutMillis); return this; @@ -134,6 +140,11 @@ public class ClientBuilder { return this; } + public ClientBuilder setSslContextProvider(Provider provider) { + clientConfig.setSslContextProvider(provider); + return this; + } + public ClientBuilder setCiphers(Iterable ciphers) { clientConfig.setCiphers(ciphers); return this; diff --git a/src/main/java/org/xbib/netty/http/client/ClientConfig.java b/src/main/java/org/xbib/netty/http/client/ClientConfig.java index dd168f9..eef73fc 100644 --- a/src/main/java/org/xbib/netty/http/client/ClientConfig.java +++ b/src/main/java/org/xbib/netty/http/client/ClientConfig.java @@ -10,11 +10,17 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import javax.net.ssl.TrustManagerFactory; import java.io.InputStream; +import java.security.Provider; public class ClientConfig { interface Defaults { + /** + * If frame logging /traffic logging is enabled or not. + */ + boolean DEBUG = false; + /** * Default for thread count. */ @@ -74,12 +80,6 @@ public class ClientConfig { */ int MAX_COMPOSITE_BUFFER_COMPONENTS = 1024; - /** - * Allow maximum concurrent connections. - * Usually, browsers restrict concurrent connections to 8 for a single address. - */ - int MAX_CONNECTIONS = 8; - /** * Default read/write timeout in milliseconds. */ @@ -93,11 +93,15 @@ public class ClientConfig { /** * Default SSL provider. */ - SslProvider SSL_PROVIDER = OpenSsl.isAvailable() && OpenSsl.isAlpnSupported() ? - SslProvider.OPENSSL : SslProvider.JDK; + SslProvider SSL_PROVIDER = SslProvider.JDK; /** - * Default ciphers. + * Default SSL context provider (for JDK SSL only). + */ + Provider SSL_CONTEXT_PROVIDER = null; + + /** + * Default ciphers. We care about HTTP/2. */ Iterable CIPHERS = Http2SecurityUtil.CIPHERS; @@ -119,6 +123,7 @@ public class ClientConfig { ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE; } + private boolean debug = Defaults.DEBUG; /** * If set to 0, then Netty will decide about thread count. @@ -142,8 +147,6 @@ public class ClientConfig { private int maxChunkSize = Defaults.MAX_CHUNK_SIZE; - private int maxConnections = Defaults.MAX_CONNECTIONS; - private int maxContentLength = Defaults.MAX_CONTENT_LENGTH; private int maxCompositeBufferComponents = Defaults.MAX_COMPOSITE_BUFFER_COMPONENTS; @@ -156,6 +159,8 @@ public class ClientConfig { private SslProvider sslProvider = Defaults.SSL_PROVIDER; + private Provider sslContextProvider = Defaults.SSL_CONTEXT_PROVIDER; + private Iterable ciphers = Defaults.CIPHERS; private CipherSuiteFilter cipherSuiteFilter = Defaults.CIPHER_SUITE_FILTER; @@ -174,6 +179,25 @@ public class ClientConfig { private HttpProxyHandler httpProxyHandler; + public ClientConfig setDebug(boolean debug) { + this.debug = debug; + return this; + } + + public ClientConfig enableDebug() { + this.debug = true; + return this; + } + + public ClientConfig disableDebug() { + this.debug = false; + return this; + } + + public boolean isDebug() { + return debug; + } + public ClientConfig setThreadCount(int threadCount) { this.threadCount = threadCount; return this; @@ -255,15 +279,6 @@ public class ClientConfig { return maxChunkSize; } - public ClientConfig setMaxConnections(int maxConnections) { - this.maxConnections = maxConnections; - return this; - } - - public int getMaxConnections() { - return maxConnections; - } - public ClientConfig setMaxContentLength(int maxContentLength) { this.maxContentLength = maxContentLength; return this; @@ -328,6 +343,15 @@ public class ClientConfig { return this; } + public ClientConfig setSslContextProvider(Provider sslContextProvider) { + this.sslContextProvider = sslContextProvider; + return this; + } + + public Provider getSslContextProvider() { + return sslContextProvider; + } + public ClientConfig setCiphers(Iterable ciphers) { this.ciphers = ciphers; return this; @@ -407,4 +431,14 @@ public class ClientConfig { public HttpProxyHandler getHttpProxyHandler() { return httpProxyHandler; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("SSL=").append(sslProvider) + .append(",SSL context provider=").append(sslContextProvider != null ? sslContextProvider.getName() : ""); + return sb.toString(); + + + } } diff --git a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java index 2301e43..975298b 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java @@ -6,8 +6,10 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.handler.TrafficLoggingHandler; @@ -34,7 +36,9 @@ public class HttpChannelInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) { - ch.pipeline().addLast(new TrafficLoggingHandler()); + if (clientConfig.isDebug()) { + ch.pipeline().addLast(new TrafficLoggingHandler()); + } if (httpAddress.isSecure()) { configureEncryptedHttp1(ch); } else { @@ -47,11 +51,13 @@ public class HttpChannelInitializer extends ChannelInitializer { try { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() .sslProvider(clientConfig.getSslProvider()) + .sslContextProvider(clientConfig.getSslContextProvider()) .keyManager(clientConfig.getKeyCertChainInputStream(), clientConfig.getKeyInputStream(), clientConfig.getKeyPassword()) .ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter()) .trustManager(clientConfig.getTrustManagerFactory()); - SslHandler sslHandler = sslContextBuilder.build().newHandler(ch.alloc()); + SslContext sslContext = sslContextBuilder.build(); + SslHandler sslHandler = sslContext.newHandler(ch.alloc()); SSLEngine engine = sslHandler.engine(); if (clientConfig.isServerNameIdentification()) { String fullQualifiedHostname = httpAddress.getInetSocketAddress().getHostName(); @@ -87,6 +93,10 @@ public class HttpChannelInitializer extends ChannelInitializer { false); httpObjectAggregator.setMaxCumulationBufferComponents(clientConfig.getMaxCompositeBufferComponents()); pipeline.addLast(httpObjectAggregator); + /*if (clientConfig.isEnableGzip()) { + pipeline.addLast(new HttpChunkContentCompressor(6)); + } + pipeline.addLast(new ChunkedWriteHandler());*/ pipeline.addLast(httpResponseHandler); } } diff --git a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChunkContentCompressor.java b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChunkContentCompressor.java new file mode 100644 index 0000000..1643365 --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChunkContentCompressor.java @@ -0,0 +1,28 @@ +package org.xbib.netty.http.client.handler.http1; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.HttpContentCompressor; + +/** + * Be sure you place the HttpChunkContentCompressor before the ChunkedWriteHandler. + */ +public class HttpChunkContentCompressor extends HttpContentCompressor { + + HttpChunkContentCompressor(int compressionLevel) { + super(compressionLevel); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf byteBuf = (ByteBuf) msg; + if (byteBuf.isReadable()) { + msg = new DefaultHttpContent(byteBuf); + } + } + super.write(ctx, msg, promise); + } +} diff --git a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java index 5bf5ca1..4d5ec95 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java @@ -17,7 +17,6 @@ import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SupportedCipherSuiteFilter; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import org.xbib.netty.http.client.ClientConfig; @@ -62,28 +61,31 @@ public class Http2ChannelInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) { DefaultHttp2Connection http2Connection = new DefaultHttp2Connection(false); - Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.INFO, "client"); - Http2ConnectionHandler http2ConnectionHandler = new HttpToHttp2ConnectionHandlerBuilder() + HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder() .connection(http2Connection) - .frameLogger(frameLogger) .frameListener(new DelegatingDecompressorFrameListener(http2Connection, new InboundHttp2ToHttpAdapterBuilder(http2Connection) .maxContentLength(clientConfig.getMaxContentLength()) .propagateSettings(true) - .build())) - .build(); - + .build())); + if (clientConfig.isDebug()) { + http2ConnectionHandlerBuilder.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client")); + } + Http2ConnectionHandler http2ConnectionHandler = http2ConnectionHandlerBuilder.build(); try { - SslContext sslContext = SslContextBuilder.forClient() - .sslProvider(SslProvider.JDK) + SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() + .sslProvider(clientConfig.getSslProvider()) .trustManager(InsecureTrustManagerFactory.INSTANCE) .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) .applicationProtocolConfig(new ApplicationProtocolConfig( ApplicationProtocolConfig.Protocol.ALPN, ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, - ApplicationProtocolNames.HTTP_2)) - .build(); + ApplicationProtocolNames.HTTP_2)); + if (clientConfig.getSslContextProvider() != null) { + sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider()); + } + SslContext sslContext = sslContextBuilder.build(); SslHandler sslHandler = sslContext.newHandler(ch.alloc()); SSLEngine engine = sslHandler.engine(); if (clientConfig.isServerNameIdentification()) { diff --git a/src/main/java/org/xbib/netty/http/client/pool/ChannelPool.java b/src/main/java/org/xbib/netty/http/client/pool/ChannelPool.java new file mode 100644 index 0000000..367d82a --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/pool/ChannelPool.java @@ -0,0 +1,23 @@ +package org.xbib.netty.http.client.pool; + +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; + +import java.io.Closeable; +import java.net.ConnectException; +import java.util.List; + +public interface ChannelPool extends Closeable { + + AttributeKey NODE_ATTRIBUTE_KEY = AttributeKey.valueOf("node"); + + void prepare(int count) throws ConnectException; + + Channel lease() throws ConnectException; + + int lease(List channels, int maxCount) throws ConnectException; + + void release(Channel channel); + + void release(List channels); +} diff --git a/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java b/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java new file mode 100644 index 0000000..b3f7860 --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java @@ -0,0 +1,340 @@ +package org.xbib.netty.http.client.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.pool.ChannelPoolHandler; + +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; + + +public class SimpleChannelPool implements ChannelPool { + + private static final Logger logger = Logger.getLogger(SimpleChannelPool.class.getName()); + + private final Semaphore semaphore; + + private final List nodes; + + private final int numberOfNodes; + + private final int retriesPerNode; + + private final Map bootstraps; + + private final Map> channels; + + private final Map> availableChannels; + + private final Map counts; + + private final Map failedCounts; + + private final Lock lock = new ReentrantLock(); + + /** + * @param semaphore the throttle for the concurrency level control + * @param nodes the endpoint nodes, any element may contain the port (followed after ":") + * to override the defaultPort argument + * @param bootstrap bootstrap instance + * @param channelPoolHandler channel pool handler being notified upon new connection is created + * @param defaultPort default port used to connect (any node address from the nodes set may override this) + * @param retriesPerNode the max count of the subsequent connection failures to the node before + * the node will be excluded from the pool, 0 means no limit + */ + public SimpleChannelPool(Semaphore semaphore, List nodes, Bootstrap bootstrap, + ChannelPoolHandler channelPoolHandler, int defaultPort, int retriesPerNode) { + this.semaphore = semaphore; + if (nodes == null || nodes.isEmpty()) { + throw new IllegalArgumentException("empty nodes array argument"); + } + this.nodes = nodes; + this.retriesPerNode = retriesPerNode; + this.numberOfNodes = nodes.size(); + bootstraps = new HashMap<>(numberOfNodes); + channels = new HashMap<>(numberOfNodes); + availableChannels = new HashMap<>(numberOfNodes); + counts = new HashMap<>(numberOfNodes); + failedCounts = new HashMap<>(numberOfNodes); + for (String node : nodes) { + InetSocketAddress nodeAddr; + if (node.contains(":")) { + String addrParts[] = node.split(":"); + nodeAddr = new InetSocketAddress(addrParts[0], Integer.parseInt(addrParts[1])); + } else { + nodeAddr = new InetSocketAddress(node, defaultPort); + } + bootstraps.put(node, bootstrap.clone().remoteAddress(nodeAddr) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel conn) throws Exception { + if(!conn.eventLoop().inEventLoop()) { + throw new AssertionError(); + } + channelPoolHandler.channelCreated(conn); + } + })); + availableChannels.put(node, new ConcurrentLinkedQueue<>()); + counts.put(node, 0); + failedCounts.put(node, 0); + } + } + + @Override + public void prepare(int count) throws ConnectException { + if (count > 0) { + for (int i = 0; i < count; i ++) { + Channel channel = connectToAnyNode(); + if(channel == null) { + throw new ConnectException("Failed to pre-create the connections to the target nodes"); + } + String nodeAddr = channel.attr(NODE_ATTRIBUTE_KEY).get(); + if (channel.isActive()) { + Queue channelQueue = availableChannels.get(nodeAddr); + if (channelQueue != null) { + channelQueue.add(channel); + } + } else { + channel.close(); + } + } + logger.info("prepared " + count + " connections"); + } else { + throw new IllegalArgumentException("Connection count should be > 0, but got " + count); + } + } + + private class CloseChannelListener implements ChannelFutureListener { + + private final String nodeAddr; + private final Channel conn; + + private CloseChannelListener(String nodeAddr, Channel conn) { + this.nodeAddr = nodeAddr; + this.conn = conn; + } + + @Override + public void operationComplete(ChannelFuture future) { + logger.fine("connection to " + nodeAddr + " closed"); + lock.lock(); + try { + synchronized (counts) { + if(counts.containsKey(nodeAddr)) { + counts.put(nodeAddr, counts.get(nodeAddr) - 1); + } + } + synchronized (channels) { + List nodeConns = channels.get(nodeAddr); + if(nodeConns != null) { + nodeConns.remove(conn); + } + } + semaphore.release(); + } finally { + lock.unlock(); + } + } + } + + private Channel connectToAnyNode() throws ConnectException { + Channel channel = null; + String nodeAddr = null; + String nextNodeAddr; + int min = Integer.MAX_VALUE; + int next; + int i = ThreadLocalRandom.current().nextInt(numberOfNodes); + for (int j = i; j < numberOfNodes; j ++) { + nextNodeAddr = nodes.get(j % numberOfNodes); + next = counts.get(nextNodeAddr); + if(next == 0) { + nodeAddr = nextNodeAddr; + break; + } else if (next < min) { + min = next; + nodeAddr = nextNodeAddr; + } + } + if (nodeAddr != null) { + logger.fine("trying connection to " + nodeAddr); + try { + channel = connect(nodeAddr); + } catch (Exception e) { + logger.warning("failed to create a new connection to " + nodeAddr + ": " + e.toString()); + if (retriesPerNode > 0) { + int selectedNodeFailedConnAttemptsCount = failedCounts.get(nodeAddr) + 1; + failedCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount); + if (selectedNodeFailedConnAttemptsCount > retriesPerNode) { + logger.warning("Failed to connect to the node \"" + nodeAddr + "\" " + + selectedNodeFailedConnAttemptsCount + " times successively, " + + "excluding the node from the connection pool forever"); + counts.put(nodeAddr, Integer.MAX_VALUE); + boolean allNodesExcluded = true; + for (String node : nodes) { + if (counts.get(node) < Integer.MAX_VALUE) { + allNodesExcluded = false; + break; + } + } + if (allNodesExcluded) { + logger.severe("no endpoint nodes left in the connection pool"); + } + } + } + if (e instanceof ConnectException) { + throw (ConnectException) e; + } else { + throw new ConnectException(e.getMessage()); + } + } + } + if (channel != null) { + channel.closeFuture().addListener(new CloseChannelListener(nodeAddr, channel)); + channel.attr(NODE_ATTRIBUTE_KEY).set(nodeAddr); + channels.computeIfAbsent(nodeAddr, na -> new ArrayList<>()).add(channel); + synchronized(counts) { + counts.put(nodeAddr, counts.get(nodeAddr) + 1); + } + if(retriesPerNode > 0) { + failedCounts.put(nodeAddr, 0); + } + logger.fine("new connection to " + nodeAddr + " created"); + } + return channel; + } + + protected Channel connect(String addr) throws Exception { + Bootstrap bootstrap = bootstraps.get(addr); + if (bootstrap != null) { + return bootstrap.connect().sync().channel(); + } + return null; + } + + private Channel poll() { + int i = ThreadLocalRandom.current().nextInt(numberOfNodes); + Queue channelQueue; + Channel channel; + for(int j = i; j < i + numberOfNodes; j ++) { + channelQueue = availableChannels.get(nodes.get(j % numberOfNodes)); + if(channelQueue != null) { + channel = channelQueue.poll(); + if(channel != null && channel.isActive()) { + return channel; + } + } + } + return null; + } + + @Override + public Channel lease() throws ConnectException { + Channel conn = null; + if (semaphore.tryAcquire()) { + if (null == (conn = poll())) { + conn = connectToAnyNode(); + } + if (conn == null) { + semaphore.release(); + throw new ConnectException(); + } + } + return conn; + } + + @Override + public int lease(List channels, int maxCount) throws ConnectException { + int availableCount = semaphore.drainPermits(); + if (availableCount == 0) { + return availableCount; + } + if (availableCount > maxCount) { + semaphore.release(availableCount - maxCount); + availableCount = maxCount; + } + Channel conn; + for (int i = 0; i < availableCount; i ++) { + if (null == (conn = poll())) { + conn = connectToAnyNode(); + } + if (conn == null) { + semaphore.release(availableCount - i); + throw new ConnectException(); + } else { + channels.add(conn); + } + } + return availableCount; + } + + @Override + public void release(Channel conn) { + String nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get(); + if( conn.isActive()) { + Queue connQueue = availableChannels.get(nodeAddr); + if (connQueue != null) { + connQueue.add(conn); + } + semaphore.release(); + } else { + conn.close(); + } + } + + @Override + public void release(List conns) { + String nodeAddr; + Queue connQueue; + for (Channel conn : conns) { + nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get(); + if (conn.isActive()) { + connQueue = availableChannels.get(nodeAddr); + connQueue.add(conn); + semaphore.release(); + } else { + conn.close(); + } + } + } + + @Override + public void close() { + lock.lock(); + int closedConnCount = 0; + for (String nodeAddr: availableChannels.keySet()) { + for (Channel conn: availableChannels.get(nodeAddr)) { + if (conn.isOpen()) { + conn.close(); + closedConnCount ++; + } + } + } + availableChannels.clear(); + for (String nodeAddr: channels.keySet()) { + for (Channel conn: channels.get(nodeAddr)) { + if (conn.isOpen()) { + conn.close(); + closedConnCount ++; + } + } + } + channels.clear(); + bootstraps.clear(); + counts.clear(); + logger.fine("closed " + closedConnCount + " connections"); + } +} diff --git a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java index 3887355..76be0d4 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java @@ -107,6 +107,7 @@ abstract class BaseTransport implements Transport { new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) : new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri, request.content()); + logger.log(Level.INFO, fullHttpRequest.toString()); Integer streamId = nextStream(); if (streamId != null) { request.headers().add(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId)); diff --git a/src/test/java/org/xbib/netty/http/client/test/AkamaiTest.java b/src/test/java/org/xbib/netty/http/client/test/AkamaiTest.java index bbc6466..8b1164c 100644 --- a/src/test/java/org/xbib/netty/http/client/test/AkamaiTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/AkamaiTest.java @@ -16,12 +16,15 @@ public class AkamaiTest extends LoggingBase { private static final Logger logger = Logger.getLogger(""); /** + * h2_demo_frame.html fails with: * 2018-02-27 23:43:32.048 INFORMATION [client] io.netty.handler.codec.http2.Http2FrameLogger * logRstStream [id: 0x4fe29f1e, L:/192.168.178.23:49429 - R:http2.akamai.com/104.94.191.203:443] * INBOUND RST_STREAM: streamId=2 errorCode=8 * 2018-02-27 23:43:32.049 SCHWERWIEGEND [] org.xbib.netty.http.client.test.a.AkamaiTest lambda$testAkamaiHttps$0 * HTTP/2 to HTTP layer caught stream reset * io.netty.handler.codec.http2.Http2Exception$StreamException: HTTP/2 to HTTP layer caught stream reset + * + * TODO(jprante) catch all promised pushes */ @Test public void testAkamaiHttps() { diff --git a/src/test/java/org/xbib/netty/http/client/test/ClientTest.java b/src/test/java/org/xbib/netty/http/client/test/ClientTest.java index 8af9b65..010e015 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ClientTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ClientTest.java @@ -1,7 +1,6 @@ package org.xbib.netty.http.client.test; import io.netty.handler.codec.http.HttpMethod; -import org.junit.Ignore; import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; @@ -17,7 +16,6 @@ public class ClientTest { private static final Logger logger = Logger.getLogger(ClientTest.class.getName()); @Test - @Ignore public void testHttp1() throws Exception { Client client = new Client(); try { @@ -37,7 +35,6 @@ public class ClientTest { } @Test - @Ignore public void testHttp1ParallelRequests() { Client client = new Client(); try { @@ -65,7 +62,6 @@ public class ClientTest { } @Test - @Ignore public void testHttp2() throws Exception { String host = "webtide.com"; Client client = new Client(); @@ -74,11 +70,12 @@ public class ClientTest { Transport transport = client.newTransport(HttpAddress.http2(host)); transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + msg.headers().entries() + - msg.content().toString(StandardCharsets.UTF_8) + + //msg.content().toString(StandardCharsets.UTF_8) + " status=" + msg.status().code())); transport.setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " + msg.headers().entries() + - msg.content().toString(StandardCharsets.UTF_8))); + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); transport.connect(); transport.awaitSettings(); simpleRequest(transport); @@ -115,7 +112,6 @@ public class ClientTest { } @Test - @Ignore public void testHttp2TwoRequestsOnSameConnection() { Client client = new Client(); try { @@ -151,9 +147,8 @@ public class ClientTest { } @Test - @Ignore - public void testMixed() throws Exception { - Client client = new Client(); + public void testTwoTransports() throws Exception { + Client client = Client.builder().enableDebug().build(); try { Transport transport = client.newTransport(HttpAddress.http1("xbib.org")); transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + @@ -178,7 +173,9 @@ public class ClientTest { } private void simpleRequest(Transport transport) { - transport.execute(Request.builder(HttpMethod.GET).setURL(transport.httpAddress().base()).build()); + transport.execute(Request.builder(HttpMethod.GET) + .setVersion(transport.httpAddress().getVersion()) + .setURL(transport.httpAddress().base()).build()); } } diff --git a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java new file mode 100644 index 0000000..1c15f5e --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java @@ -0,0 +1,40 @@ +package org.xbib.netty.http.client.test; + +import org.conscrypt.Conscrypt; +import org.junit.Test; +import org.xbib.netty.http.client.Client; +import org.xbib.netty.http.client.Request; + +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ConscryptTest extends LoggingBase { + + private static final Logger logger = Logger.getLogger(""); + + @Test + public void testConscrypt() { + Client client = Client.builder() + .enableDebug() + .setJdkSslProvider() + .setSslContextProvider(Conscrypt.newProvider()) + .build(); + logger.log(Level.INFO, client.getClientConfig().toString()); + try { + Request request = Request.get() + .setURL("https://fl-test.hbz-nrw.de") + .setVersion("HTTP/2.0") + .build() + .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)) + .setResponseListener(fullHttpResponse -> { + String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); + logger.log(Level.INFO, "status = " + fullHttpResponse.status() + + " response body = " + response); + }); + client.execute(request).get(); + } finally { + client.shutdownGracefully(); + } + } +}