diff --git a/build.gradle b/build.gradle index 3779d54..0f42c9e 100644 --- a/build.gradle +++ b/build.gradle @@ -67,7 +67,7 @@ test { jvmArgs "-javaagent:" + configurations.alpnagent.asPath } testLogging { - showStandardStreams = true + showStandardStreams = false exceptionFormat = 'full' } } diff --git a/gradle.properties b/gradle.properties index d0ddc09..1094899 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,8 +1,8 @@ group = org.xbib name = netty-http-client -version = 4.1.22.2 +version = 4.1.16.1 -netty.version = 4.1.22.Final +netty.version = 4.1.16.Final tcnative.version = 2.0.7.Final conscrypt.version = 1.0.1 xbib-net-url.version = 1.1.0 diff --git a/src/main/java/org/xbib/netty/http/client/Client.java b/src/main/java/org/xbib/netty/http/client/Client.java index f0dca21..76c9365 100644 --- a/src/main/java/org/xbib/netty/http/client/Client.java +++ b/src/main/java/org/xbib/netty/http/client/Client.java @@ -40,6 +40,7 @@ import javax.net.ssl.SSLException; import javax.net.ssl.SSLParameters; import javax.net.ssl.TrustManagerFactory; import java.io.IOException; +import java.net.InetSocketAddress; import java.security.KeyStoreException; import java.util.ArrayList; import java.util.Collections; @@ -136,7 +137,7 @@ public final class Client { this.http2SettingsHandler = new Http2SettingsHandler(); this.http2ResponseHandler = new Http2ResponseHandler(); this.transports = new CopyOnWriteArrayList<>(); - if (hasPooledConnections()) { + if (!clientConfig.getPoolNodes().isEmpty()) { List nodes = clientConfig.getPoolNodes(); Integer limit = clientConfig.getPoolNodeConnectionLimit(); if (limit == null || limit < 1) { @@ -183,7 +184,7 @@ public final class Client { } public boolean hasPooledConnections() { - return !clientConfig.getPoolNodes().isEmpty(); + return pool != null && !clientConfig.getPoolNodes().isEmpty(); } public BoundedChannelPool getPool() { @@ -222,13 +223,13 @@ public final class Client { } else { transport = new Http2Transport(this, null); } + } else { + throw new IllegalStateException(); } - if (transport != null) { - if (transportListener != null) { - transportListener.onOpen(transport); - } - transports.add(transport); + if (transportListener != null) { + transportListener.onOpen(transport); } + transports.add(transport); return transport; } @@ -270,15 +271,15 @@ public final class Client { } public void releaseChannel(Channel channel) throws IOException{ - if (hasPooledConnections()) { - try { - pool.release(channel); - } catch (Exception e) { - throw new IOException(e); - } - } else { - if (channel != null) { - channel.close(); + if (channel != null) { + if (hasPooledConnections()) { + try { + pool.release(channel); + } catch (Exception e) { + throw new IOException(e); + } + } else { + channel.close(); } } } @@ -378,14 +379,16 @@ public final class Client { private static SslHandler newSslHandler(ClientConfig clientConfig, ByteBufAllocator allocator, HttpAddress httpAddress) { try { - SslContext sslContext = newSslContext(clientConfig); - SslHandler sslHandler = sslContext.newHandler(allocator); + SslContext sslContext = newSslContext(clientConfig, httpAddress.getVersion()); + InetSocketAddress peer = httpAddress.getInetSocketAddress(); + SslHandler sslHandler = sslContext.newHandler(allocator, peer.getHostName(), peer.getPort()); SSLEngine engine = sslHandler.engine(); List serverNames = clientConfig.getServerNamesForIdentification(); if (serverNames.isEmpty()) { - serverNames = Collections.singletonList(httpAddress.getInetSocketAddress().getHostName()); + serverNames = Collections.singletonList(peer.getHostName()); } SSLParameters params = engine.getSSLParameters(); + // use sslContext.newHandler(allocator, peerHost, peerPort) when using params.setEndpointIdentificationAlgorithm params.setEndpointIdentificationAlgorithm("HTTPS"); List sniServerNames = new ArrayList<>(); for (String serverName : serverNames) { @@ -409,11 +412,11 @@ public final class Client { } } - private static SslContext newSslContext(ClientConfig clientConfig) throws SSLException { + private static SslContext newSslContext(ClientConfig clientConfig, HttpVersion httpVersion) throws SSLException { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() .sslProvider(clientConfig.getSslProvider()) .ciphers(Http2SecurityUtil.CIPHERS, clientConfig.getCipherSuiteFilter()) - .applicationProtocolConfig(newApplicationProtocolConfig()); + .applicationProtocolConfig(newApplicationProtocolConfig(httpVersion)); if (clientConfig.getSslContextProvider() != null) { sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider()); } @@ -423,11 +426,16 @@ public final class Client { return sslContextBuilder.build(); } - private static ApplicationProtocolConfig newApplicationProtocolConfig() { - return new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN, - ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, - ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, - ApplicationProtocolNames.HTTP_2); + private static ApplicationProtocolConfig newApplicationProtocolConfig(HttpVersion httpVersion) { + return httpVersion.majorVersion() == 1 ? + new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_1_1) : + new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2); } public interface TransportListener { diff --git a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java index bec85c1..7ddd3a3 100644 --- a/src/main/java/org/xbib/netty/http/client/ClientBuilder.java +++ b/src/main/java/org/xbib/netty/http/client/ClientBuilder.java @@ -192,6 +192,8 @@ public class ClientBuilder { public ClientBuilder addPoolNode(HttpAddress httpAddress) { clientConfig.addPoolNode(httpAddress); + clientConfig.setPoolVersion(httpAddress.getVersion()); + clientConfig.setPoolSecure(httpAddress.isSecure()); return this; } @@ -205,16 +207,6 @@ public class ClientBuilder { return this; } - public ClientBuilder setPoolVersion(HttpVersion poolVersion) { - clientConfig.setPoolVersion(poolVersion); - return this; - } - - public ClientBuilder setPoolSecure(boolean poolSecure) { - clientConfig.setPoolSecure(poolSecure); - return this; - } - public ClientBuilder addServerNameForIdentification(String serverName) { clientConfig.addServerNameForIdentification(serverName); return this; diff --git a/src/main/java/org/xbib/netty/http/client/HttpAddress.java b/src/main/java/org/xbib/netty/http/client/HttpAddress.java index c7ec14c..d82bc15 100644 --- a/src/main/java/org/xbib/netty/http/client/HttpAddress.java +++ b/src/main/java/org/xbib/netty/http/client/HttpAddress.java @@ -56,7 +56,7 @@ public class HttpAddress implements PoolKey { } public static HttpAddress of(Request request) { - return new HttpAddress(request.base(), request.httpVersion()); + return new HttpAddress(request.url(), request.httpVersion()); } public static HttpAddress of(URL url, HttpVersion httpVersion) { @@ -76,7 +76,7 @@ public class HttpAddress implements PoolKey { public InetSocketAddress getInetSocketAddress() { if (inetSocketAddress == null) { - // this may execute DNS + // this may execute DNS lookup this.inetSocketAddress = new InetSocketAddress(host, port); } return inetSocketAddress; diff --git a/src/main/java/org/xbib/netty/http/client/Request.java b/src/main/java/org/xbib/netty/http/client/Request.java index d593115..b52381a 100644 --- a/src/main/java/org/xbib/netty/http/client/Request.java +++ b/src/main/java/org/xbib/netty/http/client/Request.java @@ -1,6 +1,8 @@ package org.xbib.netty.http.client; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; @@ -12,8 +14,6 @@ import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpResponseListener; import org.xbib.netty.http.client.retry.BackOff; -import java.io.Closeable; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -21,9 +21,9 @@ import java.util.concurrent.CompletableFuture; /** * HTTP client request. */ -public class Request implements Closeable { +public class Request { - private final URL base; + private final URL url; private final HttpVersion httpVersion; @@ -62,7 +62,7 @@ public class Request implements Closeable { String uri, ByteBuf content, long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount, boolean isBackOff, BackOff backOff) { - this.base = url; + this.url = url; this.httpVersion = httpVersion; this.httpMethod = httpMethod; this.headers = headers; @@ -77,8 +77,8 @@ public class Request implements Closeable { this.backOff = backOff; } - public URL base() { - return base; + public URL url() { + return url; } public HttpVersion httpVersion() { @@ -139,12 +139,14 @@ public class Request implements Closeable { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("Request[base='").append(base) + sb.append("Request[url='").append(url) .append("',version=").append(httpVersion) .append(",method=").append(httpMethod) .append(",uri=").append(uri) .append(",headers=").append(headers.entries()) - .append(",content=").append(content != null ? content.copy(0,16).toString(StandardCharsets.UTF_8) : "") + .append(",content=").append(content != null && content.readableBytes() >= 16 ? + content.copy(0,16).toString(StandardCharsets.UTF_8) + "..." : + content != null ? content.toString(StandardCharsets.UTF_8) : "") .append("]"); return sb.toString(); } @@ -222,13 +224,10 @@ public class Request implements Closeable { } public static RequestBuilder builder(HttpMethod httpMethod) { - return new RequestBuilder().setMethod(httpMethod); + return new RequestBuilder(PooledByteBufAllocator.DEFAULT).setMethod(httpMethod); } - @Override - public void close() throws IOException { - if (content != null) { - content.release(); - } + public static RequestBuilder builder(ByteBufAllocator allocator, HttpMethod httpMethod) { + return new RequestBuilder(allocator).setMethod(httpMethod); } } diff --git a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java index b8dfb5f..71053f3 100644 --- a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java +++ b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java @@ -1,7 +1,11 @@ package org.xbib.netty.http.client; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; @@ -51,6 +55,8 @@ public class RequestBuilder { private static final HttpVersion HTTP_2_0 = HttpVersion.valueOf("HTTP/2.0"); + private final ByteBufAllocator allocator; + private final List removeHeaders; private final Collection cookies; @@ -85,7 +91,8 @@ public class RequestBuilder { private BackOff backOff; - RequestBuilder() { + RequestBuilder(ByteBufAllocator allocator) { + this.allocator = allocator; httpMethod = DEFAULT_METHOD; httpVersion = DEFAULT_HTTP_VERSION; userAgent = DEFAULT_USER_AGENT; @@ -341,11 +348,11 @@ public class RequestBuilder { } private void content(CharSequence charSequence, AsciiString contentType) { - content(charSequence.toString().getBytes(StandardCharsets.UTF_8), contentType); + content(ByteBufUtil.writeUtf8(allocator, charSequence), contentType); } private void content(byte[] buf, AsciiString contentType) { - content(PooledByteBufAllocator.DEFAULT.buffer(buf.length).writeBytes(buf), contentType); + content(allocator.buffer().writeBytes(buf), contentType); } private void content(ByteBuf body, AsciiString contentType) { diff --git a/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java b/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java index 87134bb..7fb87cb 100644 --- a/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java +++ b/src/main/java/org/xbib/netty/http/client/pool/BoundedChannelPool.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -85,10 +86,10 @@ public class BoundedChannelPool implements Pool { } this.numberOfNodes = nodes.size(); bootstraps = new HashMap<>(numberOfNodes); - channels = new HashMap<>(numberOfNodes); - availableChannels = new HashMap<>(numberOfNodes); - counts = new HashMap<>(numberOfNodes); - failedCounts = new HashMap<>(numberOfNodes); + channels = new ConcurrentHashMap<>(numberOfNodes); + availableChannels = new ConcurrentHashMap<>(numberOfNodes); + counts = new ConcurrentHashMap<>(numberOfNodes); + failedCounts = new ConcurrentHashMap<>(numberOfNodes); for (K node : nodes) { ChannelPoolInitializer initializer = new ChannelPoolInitializer(node, channelPoolHandler); bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress()) @@ -247,6 +248,9 @@ public class BoundedChannelPool implements Pool { int i = ThreadLocalRandom.current().nextInt(numberOfNodes); for (int j = i; j < numberOfNodes; j ++) { nextKey = nodes.get(j % numberOfNodes); + if (counts == null) { + throw new ConnectException("strange"); + } next = counts.get(nextKey); if (next == 0) { key = nextKey; @@ -293,9 +297,7 @@ public class BoundedChannelPool implements Pool { channel.closeFuture().addListener(new CloseChannelListener(key, channel)); channel.attr(attributeKey).set(key); channels.computeIfAbsent(key, node -> new ArrayList<>()).add(channel); - synchronized (counts) { - counts.put(key, counts.get(key) + 1); - } + counts.put(key, counts.get(key) + 1); if (retriesPerNode > 0) { failedCounts.put(key, 0); } @@ -343,16 +345,12 @@ public class BoundedChannelPool implements Pool { logger.log(Level.FINE,"connection to " + key + " closed"); lock.lock(); try { - synchronized (counts) { - if (counts.containsKey(key)) { - counts.put(key, counts.get(key) - 1); - } + if (counts.containsKey(key)) { + counts.put(key, counts.get(key) - 1); } - synchronized (channels) { - List channels = BoundedChannelPool.this.channels.get(key); - if (channels != null) { - channels.remove(channel); - } + List channels = BoundedChannelPool.this.channels.get(key); + if (channels != null) { + channels.remove(channel); } semaphore.release(); } finally { diff --git a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java index 98c8ff6..91d726a 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java @@ -74,38 +74,34 @@ abstract class BaseTransport implements Transport { // Our algorithm is: use always "origin form" for HTTP 1, use absolute form for HTTP 2. // The reason is that Netty derives the HTTP/2 scheme header from the absolute form. String uri = request.httpVersion().majorVersion() == 1 ? - request.base().relativeReference() : request.base().toString(); + request.url().relativeReference() : request.url().toString(); FullHttpRequest fullHttpRequest = request.content() == null ? new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) : new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri, request.content()); - try { - Integer streamId = nextStream(); - if (streamId != null && streamId > 0) { - request.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId)); - } else { - if (request.httpVersion().majorVersion() == 2) { - logger.log(Level.WARNING, "no streamId but HTTP/2 request. Strange!!! " + getClass().getName()); - } + Integer streamId = nextStream(); + if (streamId != null && streamId > 0) { + request.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId)); + } else { + if (request.httpVersion().majorVersion() == 2) { + logger.log(Level.WARNING, "no streamId but HTTP/2 request. Strange!!! " + getClass().getName()); } - // add matching cookies from box (previous requests) and new cookies from request builder - Collection cookies = new ArrayList<>(); - cookies.addAll(matchCookiesFromBox(request)); - cookies.addAll(matchCookies(request)); - if (!cookies.isEmpty()) { - request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies)); - } - // add stream-id and cookie headers - fullHttpRequest.headers().set(request.headers()); - if (streamId != null) { - requests.put(streamId, request); - } - if (channel.isWritable()) { - channel.writeAndFlush(fullHttpRequest); + } + // add matching cookies from box (previous requests) and new cookies from request builder + Collection cookies = new ArrayList<>(); + cookies.addAll(matchCookiesFromBox(request)); + cookies.addAll(matchCookies(request)); + if (!cookies.isEmpty()) { + request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies)); + } + // add stream-id and cookie headers + fullHttpRequest.headers().set(request.headers()); + if (streamId != null) { + requests.put(streamId, request); + } + if (channel.isWritable()) { + channel.writeAndFlush(fullHttpRequest); - } - } finally { - request.close(); } return this; } @@ -214,14 +210,14 @@ abstract class BaseTransport implements Transport { location = new PercentDecoder(StandardCharsets.UTF_8.newDecoder()).decode(location); if (location != null) { logger.log(Level.FINE, "found redirect location: " + location); - URL redirUrl = URL.base(request.base()).resolve(location); + URL redirUrl = URL.base(request.url()).resolve(location); HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod(); RequestBuilder newHttpRequestBuilder = Request.builder(method) .url(redirUrl) .setVersion(request.httpVersion()) .setHeaders(request.headers()) .content(request.content()); - request.base().getQueryParams().forEach(pair -> + request.url().getQueryParams().forEach(pair -> newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond()) ); request.cookies().forEach(newHttpRequestBuilder::addCookie); @@ -309,13 +305,13 @@ abstract class BaseTransport implements Transport { private List matchCookiesFromBox(Request request) { return cookieBox == null ? Collections.emptyList() : cookieBox.keySet().stream().filter(cookie -> - matchCookie(request.base(), cookie) + matchCookie(request.url(), cookie) ).collect(Collectors.toList()); } private List matchCookies(Request request) { return request.cookies().stream().filter(cookie -> - matchCookie(request.base(), cookie) + matchCookie(request.url(), cookie) ).collect(Collectors.toList()); } diff --git a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java index 94b0dd2..6bbe8f1 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java @@ -69,13 +69,11 @@ public class HttpTransport extends BaseTransport { if (retryRequest != null) { // retry transport, wait for completion client.retry(this, retryRequest); - retryRequest.close(); } else { Request continueRequest = continuation(request, fullHttpResponse); if (continueRequest != null) { // continue with new transport, synchronous call here, wait for completion client.continuation(this, continueRequest); - continueRequest.close(); } } } catch (URLSyntaxException | IOException e) { @@ -94,7 +92,7 @@ public class HttpTransport extends BaseTransport { } @Override - public void awaitResponse(Integer streamId) throws IOException { + public void awaitResponse(Integer streamId) throws IOException, TimeoutException { if (streamId == null) { return; } @@ -103,9 +101,17 @@ public class HttpTransport extends BaseTransport { } CompletableFuture promise = sequentialPromiseMap.get(streamId); if (promise != null) { + long millis = client.getClientConfig().getReadTimeoutMillis(); + Request request = fromStreamId(streamId); + if (request != null && request.getTimeoutInMillis() > 0) { + millis = request.getTimeoutInMillis(); + } try { - promise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + promise.get(millis, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + this.throwable = e; + throw new TimeoutException("timeout of " + millis + " milliseconds exceeded"); + } catch (InterruptedException | ExecutionException e) { this.throwable = e; throw new IOException(e); } finally { @@ -121,7 +127,7 @@ public class HttpTransport extends BaseTransport { awaitResponse(streamId); client.releaseChannel(channel); } - } catch (IOException e) { + } catch (IOException | TimeoutException e) { logger.log(Level.WARNING, e.getMessage(), e); } finally { sequentialPromiseMap.clear(); diff --git a/src/main/java/org/xbib/netty/http/client/transport/Transport.java b/src/main/java/org/xbib/netty/http/client/transport/Transport.java index 0fca5c1..2d3d395 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/Transport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/Transport.java @@ -12,6 +12,7 @@ import org.xbib.netty.http.client.Request; import java.io.IOException; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.function.Function; public interface Transport { @@ -38,7 +39,7 @@ public interface Transport { void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers); - void awaitResponse(Integer streamId) throws IOException; + void awaitResponse(Integer streamId) throws IOException, TimeoutException; Transport get(); diff --git a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java index e198e36..e4d2edc 100644 --- a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java @@ -14,19 +14,19 @@ import java.util.logging.Logger; public class CompletableFutureTest { - private static final Logger logger = Logger.getLogger(ElasticsearchTest.class.getName()); + private static final Logger logger = Logger.getLogger(CompletableFutureTest.class.getName()); /** * Get some weird content from one URL and post it to another URL, by composing completable futures. */ @Test public void testComposeCompletableFutures() throws IOException { - Client client = new Client(); + Client client = Client.builder().build(); try { final Function httpResponseStringFunction = response -> response.content().toString(StandardCharsets.UTF_8); Request request = Request.get() - .url("https://repo.maven.apache.org/maven2/org/xbib/netty-http-client/maven-metadata.xml.sha1") + .url("http://repo.maven.apache.org/maven2/org/xbib/netty-http-client/maven-metadata.xml.sha1") .build(); CompletableFuture completableFuture = client.execute(request, httpResponseStringFunction) .exceptionally(Throwable::getMessage) diff --git a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java index 9ccbe50..3c0b287 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java @@ -1,6 +1,7 @@ package org.xbib.netty.http.client.test; import org.conscrypt.Conscrypt; +import org.junit.Ignore; import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; @@ -17,13 +18,14 @@ public class ConscryptTest extends LoggingBase { @Test public void testConscrypt() throws IOException { Client client = Client.builder() + .enableDebug() .setJdkSslProvider() .setSslContextProvider(Conscrypt.newProvider()) .build(); logger.log(Level.INFO, client.getClientConfig().toString()); try { Request request = Request.get() - .url("https://xbib.org") + .url("https://google.com") .setVersion("HTTP/1.1") .build() .setResponseListener(fullHttpResponse -> { diff --git a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java index b4f6f97..cc1c789 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java @@ -3,13 +3,16 @@ package org.xbib.netty.http.client.test; import org.junit.Ignore; import org.junit.Test; import org.xbib.netty.http.client.Client; +import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; -import org.xbib.netty.http.client.transport.Transport; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -22,8 +25,27 @@ public class ElasticsearchTest extends LoggingBase { private static final Logger logger = Logger.getLogger(ElasticsearchTest.class.getName()); @Test + @Ignore + public void testElasticsearch() throws IOException { + Client client = Client.builder().enableDebug().build(); + try { + Request request = Request.get().url("http://localhost:9200") + .build() + .setResponseListener(fullHttpResponse -> { + String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); + logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); + }); + logger.info("request = " + request.toString()); + client.execute(request); + } finally { + client.shutdownGracefully(); + } + } + + @Test + @Ignore public void testElasticsearchCreateDocument() throws IOException { - Client client = new Client(); + Client client = Client.builder().enableDebug().build(); try { Request request = Request.put().url("http://localhost:9200/test/test/1") .json("{\"text\":\"Hello World\"}") @@ -32,6 +54,7 @@ public class ElasticsearchTest extends LoggingBase { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); }); + logger.info("request = " + request.toString()); client.execute(request); } finally { client.shutdownGracefully(); @@ -39,6 +62,7 @@ public class ElasticsearchTest extends LoggingBase { } @Test + @Ignore public void testElasticsearchMatchQuery() throws IOException { Client client = new Client(); try { @@ -55,24 +79,46 @@ public class ElasticsearchTest extends LoggingBase { } } + /** + * This shows the usage of 4 concurrent pooled connections on 4 threads, querying Elasticsearch. + * @throws IOException if test fails + */ @Test - public void testElasticsearchConcurrent() throws IOException { - Client client = Client.builder().setReadTimeoutMillis(20000).build(); + public void testElasticsearchPooled() throws IOException { + HttpAddress httpAddress = HttpAddress.http1("localhost", 9200); + int limit = 4; + Client client = Client.builder() + .addPoolNode(httpAddress) + .setPoolNodeConnectionLimit(limit) + .build(); int max = 1000; + int threads = 4; try { - List queries = new ArrayList<>(); - for (int i = 0; i < max; i++) { - queries.add(newRequest()); - } - Transport transport = client.execute(queries.get(0)).get(); - for (int i = 1; i < max; i++) { - transport.execute(queries.get(i)).get(); + ExecutorService executorService = Executors.newFixedThreadPool(threads); + for (int n = 0; n < threads; n++) { + executorService.submit(() -> { + List queries = new ArrayList<>(); + for (int i = 0; i < max; i++) { + queries.add(newRequest()); + } + try { + for (int i = 0; i < max; i++) { + client.pooledExecute(queries.get(i)).get(); + } + } catch (IOException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + }); } + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.log(Level.WARNING, e.getMessage(), e); } finally { client.shutdownGracefully(); - logger.log(Level.INFO, "count=" + count); } - assertEquals(max, count.get()); + logger.log(Level.INFO, "count=" + count); + assertEquals(max * threads, count.get()); } private Request newRequest() { @@ -81,10 +127,12 @@ public class ElasticsearchTest extends LoggingBase { .json("{\"query\":{\"match\":{\"text\":\"Hello World\"}}}") .addHeader("connection", "keep-alive") .build() - .setResponseListener(fullHttpResponse -> - logger.log(Level.FINE, "status = " + fullHttpResponse.status() + - " counter = " + count.getAndIncrement() + - " response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8))); + .setResponseListener(fullHttpResponse -> { + count.getAndIncrement(); + if (fullHttpResponse.status().code() != 200) { + logger.log(Level.WARNING,"error: " + fullHttpResponse.toString()); + } + }); } private final AtomicInteger count = new AtomicInteger(); diff --git a/src/test/java/org/xbib/netty/http/client/test/Http1Test.java b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java index 09f51d4..18a3853 100644 --- a/src/test/java/org/xbib/netty/http/client/test/Http1Test.java +++ b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java @@ -1,15 +1,12 @@ package org.xbib.netty.http.client.test; import io.netty.handler.codec.http.HttpMethod; -import org.junit.After; -import org.junit.Ignore; import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -17,16 +14,9 @@ public class Http1Test extends LoggingBase { private static final Logger logger = Logger.getLogger(Http1Test.class.getName()); - @After - public void checkThreads() { - Set threadSet = Thread.getAllStackTraces().keySet(); - logger.log(Level.INFO, "threads = " + threadSet.size() ); - threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString())); - } - @Test public void testHttp1() throws Exception { - Client client = new Client(); + Client client = Client.builder().enableDebug().build(); try { Request request = Request.get().url("http://xbib.org").build() .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + @@ -40,7 +30,6 @@ public class Http1Test extends LoggingBase { } @Test - @Ignore public void testParallelRequests() throws IOException { Client client = Client.builder().enableDebug().build(); try { @@ -70,7 +59,6 @@ public class Http1Test extends LoggingBase { } @Test - @Ignore public void testSequentialRequests() throws Exception { Client client = Client.builder().enableDebug().build(); try { @@ -79,7 +67,7 @@ public class Http1Test extends LoggingBase { msg.content().toString(StandardCharsets.UTF_8))); client.execute(request1).get(); - Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0").build() + Request request2 = Request.get().url("http://google.com").setVersion("HTTP/1.1").build() .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + msg.content().toString(StandardCharsets.UTF_8))); client.execute(request2).get(); diff --git a/src/test/java/org/xbib/netty/http/client/test/Http2Test.java b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java index 94c4cad..4b45574 100644 --- a/src/test/java/org/xbib/netty/http/client/test/Http2Test.java +++ b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java @@ -25,7 +25,7 @@ public class Http2Test extends LoggingBase { * * demo/h2_demo_frame.html sends no content, only a push promise, and does not continue * - * @throws IOException + * @throws IOException if test fails */ @Test @Ignore diff --git a/src/test/java/org/xbib/netty/http/client/test/LeakTest.java b/src/test/java/org/xbib/netty/http/client/test/LeakTest.java index b8361d0..fefb811 100644 --- a/src/test/java/org/xbib/netty/http/client/test/LeakTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/LeakTest.java @@ -21,7 +21,7 @@ public class LeakTest { } @Test - public void testForLeaks() throws IOException, InterruptedException { + public void testForLeaks() throws IOException { Client client = new Client(); client.shutdownGracefully(); } diff --git a/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java b/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java index 1ef9c34..6756d1f 100644 --- a/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/PooledClientTest.java @@ -1,6 +1,8 @@ package org.xbib.netty.http.client.test; +import io.netty.handler.codec.http.HttpVersion; import org.junit.Test; +import org.xbib.net.URL; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; @@ -21,15 +23,15 @@ public class PooledClientTest extends LoggingBase { @Test public void testPooledClientWithSingleNode() throws IOException { int loop = 10; - HttpAddress httpAddress = HttpAddress.http1("xbib.org", 80); + int threads = Runtime.getRuntime().availableProcessors(); + URL url = URL.from("http://xbib.org"); + HttpAddress httpAddress = HttpAddress.of(url, HttpVersion.HTTP_1_1); Client client = Client.builder() .addPoolNode(httpAddress) - .setPoolSecure(httpAddress.isSecure()) - .setPoolNodeConnectionLimit(16) + .setPoolNodeConnectionLimit(threads) .build(); AtomicInteger count = new AtomicInteger(); try { - int threads = 16; ExecutorService executorService = Executors.newFixedThreadPool(threads); for (int n = 0; n < threads; n++) { executorService.submit(() -> { @@ -37,25 +39,26 @@ public class PooledClientTest extends LoggingBase { logger.log(Level.INFO, "starting " + Thread.currentThread()); for (int i = 0; i < loop; i++) { Request request = Request.get() - .url("http://xbib.org/repository/") - .setVersion("HTTP/1.1") + .url(url.toString()) + .setVersion(httpAddress.getVersion()) + //.setTimeoutInMillis(25000L) .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); //logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); + count.getAndIncrement(); }); client.pooledExecute(request).get(); - count.getAndIncrement(); } logger.log(Level.INFO, "done " + Thread.currentThread()); - } catch (IOException e) { + } catch (Throwable e) { logger.log(Level.WARNING, e.getMessage(), e); } }); } executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { + } catch (Throwable e) { logger.log(Level.WARNING, e.getMessage(), e); } finally { client.shutdownGracefully(); diff --git a/src/test/java/org/xbib/netty/http/client/test/RequestBuilderTest.java b/src/test/java/org/xbib/netty/http/client/test/RequestBuilderTest.java index c649630..6faa6e5 100644 --- a/src/test/java/org/xbib/netty/http/client/test/RequestBuilderTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/RequestBuilderTest.java @@ -13,7 +13,7 @@ public class RequestBuilderTest { @Test public void testSimpleRequest() { - Request request = Request.builder(HttpMethod.GET).build(); + Request request = Request.builder(HttpMethod.GET).content("Hello", "text/plain").build(); logger.log(Level.INFO, request.toString()); } } diff --git a/src/test/java/org/xbib/netty/http/client/test/SecureHttp1Test.java b/src/test/java/org/xbib/netty/http/client/test/SecureHttp1Test.java new file mode 100644 index 0000000..f3c40ac --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/SecureHttp1Test.java @@ -0,0 +1,81 @@ +package org.xbib.netty.http.client.test; + +import io.netty.handler.codec.http.HttpMethod; +import org.junit.Ignore; +import org.junit.Test; +import org.xbib.netty.http.client.Client; +import org.xbib.netty.http.client.Request; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class SecureHttp1Test extends LoggingBase { + + private static final Logger logger = Logger.getLogger(SecureHttp1Test.class.getName()); + + @Test + public void testHttp1() throws Exception { + Client client = Client.builder().enableDebug().build(); + try { + Request request = Request.get().url("https://www.google.com/").build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + client.execute(request).get(); + } finally { + client.shutdown(); + } + } + + @Test + @Ignore + public void testParallelRequests() throws IOException { + Client client = Client.builder().enableDebug().build(); + try { + Request request1 = Request.builder(HttpMethod.GET) + .url("https://google.com").setVersion("HTTP/1.1") + .build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + Request request2 = Request.builder(HttpMethod.GET) + .url("https://google.com").setVersion("HTTP/1.1") + .build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + + for (int i = 0; i < 10; i++) { + client.execute(request1); + client.execute(request2); + } + + } finally { + client.shutdownGracefully(); + } + } + + @Test + @Ignore + public void testSequentialRequests() throws Exception { + Client client = Client.builder().enableDebug().build(); + try { + Request request1 = Request.get().url("https://google.com").build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.content().toString(StandardCharsets.UTF_8))); + client.execute(request1).get(); + + Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0").build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.content().toString(StandardCharsets.UTF_8))); + client.execute(request2).get(); + } finally { + client.shutdown(); + } + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java index 658b8e1..08b9ac3 100644 --- a/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java @@ -40,14 +40,14 @@ public class EpollTest { private static final Logger logger = Logger.getLogger(EpollTest.class.getName()); - private static final int CONCURRENCY = 10; + private static final int CONCURRENCY = 4; private static final List NODES = Collections.singletonList(HttpAddress.http1("localhost", 12345)); private static final long TEST_TIME_SECONDS = 100; - private static final int ATTEMPTS = 10_000; + private static final int ATTEMPTS = 1_000; private static final int FAIL_EVERY_ATTEMPT = 10; diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java index 5be5cbc..d27217d 100644 --- a/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java @@ -37,14 +37,14 @@ public class NioTest { private static final Logger logger = Logger.getLogger(NioTest.class.getName()); - private static final int CONCURRENCY = 10; + private static final int CONCURRENCY = 4; private static final List NODES = Collections.singletonList(HttpAddress.http1("localhost", 12345)); private static final long TEST_TIME_SECONDS = 100; - private static final int ATTEMPTS = 10_000; + private static final int ATTEMPTS = 1_000; private static final int FAIL_EVERY_ATTEMPT = 10; diff --git a/src/test/java/org/xbib/netty/http/client/test/simple/Http2FramesTest.java b/src/test/java/org/xbib/netty/http/client/test/simple/Http2FramesTest.java index 386e006..9ae6ef6 100644 --- a/src/test/java/org/xbib/netty/http/client/test/simple/Http2FramesTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/simple/Http2FramesTest.java @@ -44,7 +44,6 @@ public class Http2FramesTest { private static final Logger logger = Logger.getLogger(Http2FramesTest.class.getName()); @Test - @Ignore public void testHttp2Frames() throws Exception { final InetSocketAddress inetSocketAddress = new InetSocketAddress("webtide.com", 443); CompletableFuture completableFuture = new CompletableFuture<>(); @@ -52,59 +51,59 @@ public class Http2FramesTest { Channel clientChannel = null; try { Bootstrap bootstrap = new Bootstrap() - .group(eventLoopGroup) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + SslContext sslContext = SslContextBuilder.forClient() + .sslProvider(SslProvider.JDK) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)) + .build(); + SslHandler sslHandler = sslContext.newHandler(ch.alloc()); + SSLEngine engine = sslHandler.engine(); + String fullQualifiedHostname = inetSocketAddress.getHostName(); + SSLParameters params = engine.getSSLParameters(); + params.setServerNames(Arrays.asList(new SNIServerName[]{new SNIHostName(fullQualifiedHostname)})); + engine.setSSLParameters(params); + ch.pipeline().addLast(sslHandler); + Http2FrameAdapter frameAdapter = new Http2FrameAdapter() { @Override - protected void initChannel(Channel ch) throws Exception { - SslContext sslContext = SslContextBuilder.forClient() - .sslProvider(SslProvider.JDK) - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) - .applicationProtocolConfig(new ApplicationProtocolConfig( - ApplicationProtocolConfig.Protocol.ALPN, - ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, - ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, - ApplicationProtocolNames.HTTP_2)) - .build(); - SslHandler sslHandler = sslContext.newHandler(ch.alloc()); - SSLEngine engine = sslHandler.engine(); - String fullQualifiedHostname = inetSocketAddress.getHostName(); - SSLParameters params = engine.getSSLParameters(); - params.setServerNames(Arrays.asList(new SNIServerName[]{new SNIHostName(fullQualifiedHostname)})); - engine.setSSLParameters(params); - ch.pipeline().addLast(sslHandler); - Http2FrameAdapter frameAdapter = new Http2FrameAdapter() { - @Override - public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { - logger.log(Level.FINE, "settings received, now writing request"); - Http2ConnectionHandler handler = ctx.pipeline().get(Http2ConnectionHandler.class); - handler.encoder().writeHeaders(ctx, 3, - new DefaultHttp2Headers().method(HttpMethod.GET.asciiName()) - .path("/") - .scheme("https") - .authority(inetSocketAddress.getHostName()), - 0, true, ctx.newPromise()); - ctx.channel().flush(); - } - - @Override - public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception { - int i = super.onDataRead(ctx, streamId, data, padding, endOfStream); - if (endOfStream) { - completableFuture.complete(true); - } - return i; - } - }; - Http2ConnectionHandlerBuilder builder = new Http2ConnectionHandlerBuilder() - .server(false) - .frameListener(frameAdapter) - .frameLogger(new Http2FrameLogger(LogLevel.INFO, "client")); - ch.pipeline().addLast(builder.build()); + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { + logger.log(Level.FINE, "settings received, now writing request"); + Http2ConnectionHandler handler = ctx.pipeline().get(Http2ConnectionHandler.class); + handler.encoder().writeHeaders(ctx, 3, + new DefaultHttp2Headers().method(HttpMethod.GET.asciiName()) + .path("/") + .scheme("https") + .authority(inetSocketAddress.getHostName()), + 0, true, ctx.newPromise()); + ctx.channel().flush(); } - }); + + @Override + public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception { + int i = super.onDataRead(ctx, streamId, data, padding, endOfStream); + if (endOfStream) { + completableFuture.complete(true); + } + return i; + } + }; + Http2ConnectionHandlerBuilder builder = new Http2ConnectionHandlerBuilder() + .server(false) + .frameListener(frameAdapter) + .frameLogger(new Http2FrameLogger(LogLevel.INFO, "client")); + ch.pipeline().addLast(builder.build()); + } + }); logger.log(Level.INFO, () -> "connecting"); clientChannel = bootstrap.connect(inetSocketAddress).sync().channel(); logger.log(Level.INFO, () -> "waiting for end of stream");