diff --git a/build.gradle b/build.gradle index 7615225..31bfb88 100644 --- a/build.gradle +++ b/build.gradle @@ -37,6 +37,7 @@ dependencies { compile "org.xbib:net-url:${project.property('xbib-net-url.version')}" compile "io.netty:netty-codec-http2:${project.property('netty.version')}" compile "io.netty:netty-handler-proxy:${project.property('netty.version')}" + testCompile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}" testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}" testCompile "junit:junit:${project.property('junit.version')}" diff --git a/gradle.properties b/gradle.properties index 3885d55..8bbb631 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,8 +1,8 @@ group = org.xbib name = netty-http-client -version = 4.1.19.1 +version = 4.1.16.0 -netty.version = 4.1.19.Final +netty.version = 4.1.16.Final tcnative.version = 2.0.7.Final conscrypt.version = 1.0.1 xbib-net-url.version = 1.1.0 diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 8252d7e..f6b961f 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 454cfef..e65c9e2 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Sun Feb 25 12:39:15 CET 2018 +#Fri Mar 02 19:15:04 CET 2018 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-rc-1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-all.zip diff --git a/src/main/java/org/xbib/netty/http/client/Client.java b/src/main/java/org/xbib/netty/http/client/Client.java index 30579f2..532b598 100644 --- a/src/main/java/org/xbib/netty/http/client/Client.java +++ b/src/main/java/org/xbib/netty/http/client/Client.java @@ -18,15 +18,21 @@ import org.xbib.netty.http.client.handler.http1.HttpResponseHandler; import org.xbib.netty.http.client.handler.http2.Http2ChannelInitializer; import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler; import org.xbib.netty.http.client.handler.http2.Http2SettingsHandler; +import org.xbib.netty.http.client.pool.Pool; +import org.xbib.netty.http.client.pool.SimpleChannelPool; import org.xbib.netty.http.client.transport.Http2Transport; import org.xbib.netty.http.client.transport.HttpTransport; import org.xbib.netty.http.client.transport.Transport; import org.xbib.netty.http.client.util.NetworkUtils; +import javax.net.ssl.TrustManagerFactory; +import java.io.IOException; +import java.security.KeyStoreException; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.function.Function; import java.util.logging.Level; @@ -62,6 +68,8 @@ public final class Client { private TransportListener transportListener; + private Pool pool; + public Client() { this(new ClientConfig()); } @@ -74,6 +82,7 @@ public final class Client { EventLoopGroup eventLoopGroup, Class socketChannelClass) { Objects.requireNonNull(clientConfig); this.clientConfig = clientConfig; + initializeTrustManagerFactory(clientConfig); this.byteBufAllocator = byteBufAllocator != null ? byteBufAllocator : PooledByteBufAllocator.DEFAULT; this.eventLoopGroup = eventLoopGroup != null ? @@ -94,6 +103,22 @@ public final class Client { this.http2SettingsHandler = new Http2SettingsHandler(); this.http2ResponseHandler = new Http2ResponseHandler(); this.transports = new CopyOnWriteArrayList<>(); + List nodes = clientConfig.getNodes(); + if (!nodes.isEmpty()) { + Integer limit = clientConfig.getNodeConnectionLimit(); + if (limit == null || limit > nodes.size()) { + limit = nodes.size(); + } + if (limit < 1) { + limit = 1; + } + Semaphore semaphore = new Semaphore(limit); + Integer retries = clientConfig.getRetriesPerNode(); + if (retries == null || retries < 0) { + retries = 0; + } + this.pool = new SimpleChannelPool<>(semaphore, nodes, bootstrap, null, retries); + } } public static ClientBuilder builder() { @@ -140,23 +165,28 @@ public final class Client { public Channel newChannel(HttpAddress httpAddress) throws InterruptedException { HttpVersion httpVersion = httpAddress.getVersion(); ChannelInitializer initializer; + Channel channel; if (httpVersion.majorVersion() < 2) { initializer = new HttpChannelInitializer(clientConfig, httpAddress, httpResponseHandler); + channel = bootstrap.handler(initializer) + .connect(httpAddress.getInetSocketAddress()).sync().await().channel(); } else { - initializer = new Http2ChannelInitializer(clientConfig, httpAddress, http2SettingsHandler, http2ResponseHandler); + initializer = new Http2ChannelInitializer(clientConfig, httpAddress, + http2SettingsHandler, http2ResponseHandler); + channel = bootstrap.handler(initializer) + .connect(httpAddress.getInetSocketAddress()).sync().await().channel(); } - return bootstrap.handler(initializer) - .connect(httpAddress.getInetSocketAddress()).sync().await().channel(); + return channel; } - public Transport execute(Request request) { - Transport nextTransport = newTransport(HttpAddress.of(request)); - nextTransport.execute(request); - return nextTransport; + public Transport execute(Request request) throws IOException { + Transport transport = newTransport(HttpAddress.of(request)); + transport.execute(request); + return transport; } public CompletableFuture execute(Request request, - Function supplier) { + Function supplier) throws IOException { return newTransport(HttpAddress.of(request)).execute(request, supplier); } @@ -165,19 +195,19 @@ public final class Client { * @param transport the previous transport * @param request the new request for continuing the request. */ - public void continuation(Transport transport, Request request) { + public void continuation(Transport transport, Request request) throws IOException { Transport nextTransport = newTransport(HttpAddress.of(request)); - nextTransport.setResponseListener(transport.getResponseListener()); - nextTransport.setExceptionListener(transport.getExceptionListener()); - nextTransport.setHeadersListener(transport.getHeadersListener()); - nextTransport.setCookieListener(transport.getCookieListener()); - nextTransport.setPushListener(transport.getPushListener()); nextTransport.setCookieBox(transport.getCookieBox()); nextTransport.execute(request); nextTransport.get(); close(nextTransport); } + public void retry(Transport transport, Request request) throws IOException { + transport.execute(request); + transport.get(); + close(transport); + } public Transport prepareRequest(Request request) { return newTransport(HttpAddress.of(request)); @@ -206,6 +236,21 @@ public final class Client { shutdown(); } + /** + * Initialize trust manager factory once per client lifecycle. + * @param clientConfig the client config + */ + private static void initializeTrustManagerFactory(ClientConfig clientConfig) { + TrustManagerFactory trustManagerFactory = clientConfig.getTrustManagerFactory(); + if (trustManagerFactory != null) { + try { + trustManagerFactory.init(clientConfig.getTrustManagerKeyStore()); + } catch (KeyStoreException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + } + } + public interface TransportListener { void onOpen(Transport transport); diff --git a/src/main/java/org/xbib/netty/http/client/ClientConfig.java b/src/main/java/org/xbib/netty/http/client/ClientConfig.java index eef73fc..d45ae1f 100644 --- a/src/main/java/org/xbib/netty/http/client/ClientConfig.java +++ b/src/main/java/org/xbib/netty/http/client/ClientConfig.java @@ -3,14 +3,14 @@ package org.xbib.netty.http.client; import io.netty.handler.codec.http2.Http2SecurityUtil; import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.ssl.CipherSuiteFilter; -import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SupportedCipherSuiteFilter; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import javax.net.ssl.TrustManagerFactory; import java.io.InputStream; +import java.security.KeyStore; import java.security.Provider; +import java.util.List; public class ClientConfig { @@ -110,11 +110,6 @@ public class ClientConfig { */ CipherSuiteFilter CIPHER_SUITE_FILTER = SupportedCipherSuiteFilter.INSTANCE; - /** - * Default trust manager factory. - */ - TrustManagerFactory TRUST_MANAGER_FACTORY = InsecureTrustManagerFactory.INSTANCE; - boolean USE_SERVER_NAME_IDENTIFICATION = true; /** @@ -123,6 +118,20 @@ public class ClientConfig { ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE; } + private static TrustManagerFactory TRUST_MANAGER_FACTORY; + + static { + try { + TRUST_MANAGER_FACTORY = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + //InsecureTrustManagerFactory.INSTANCE; + //TRUST_MANAGER_FACTORY.init((KeyStore) null); + // java.lang.IllegalStateException: TrustManagerFactoryImpl is not initialized + //TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + } catch (Exception e) { + TRUST_MANAGER_FACTORY = null; + } + } + private boolean debug = Defaults.DEBUG; /** @@ -165,7 +174,9 @@ public class ClientConfig { private CipherSuiteFilter cipherSuiteFilter = Defaults.CIPHER_SUITE_FILTER; - private TrustManagerFactory trustManagerFactory = Defaults.TRUST_MANAGER_FACTORY; + private TrustManagerFactory trustManagerFactory = TRUST_MANAGER_FACTORY; + + private KeyStore trustManagerKeyStore = null; private boolean serverNameIdentification = Defaults.USE_SERVER_NAME_IDENTIFICATION; @@ -179,6 +190,12 @@ public class ClientConfig { private HttpProxyHandler httpProxyHandler; + private List nodes; + + private Integer nodeConnectionLimit; + + private Integer retriesPerNode; + public ClientConfig setDebug(boolean debug) { this.debug = debug; return this; @@ -370,15 +387,6 @@ public class ClientConfig { return cipherSuiteFilter; } - public ClientConfig setTrustManagerFactory(TrustManagerFactory trustManagerFactory) { - this.trustManagerFactory = trustManagerFactory; - return this; - } - - public TrustManagerFactory getTrustManagerFactory() { - return trustManagerFactory; - } - public ClientConfig setKeyCert(InputStream keyCertChainInputStream, InputStream keyInputStream) { this.keyCertChainInputStream = keyCertChainInputStream; this.keyInputStream = keyInputStream; @@ -423,6 +431,24 @@ public class ClientConfig { return clientAuthMode; } + public ClientConfig setTrustManagerFactory(TrustManagerFactory trustManagerFactory) { + this.trustManagerFactory = trustManagerFactory; + return this; + } + + public TrustManagerFactory getTrustManagerFactory() { + return trustManagerFactory; + } + + public ClientConfig setTrustManagerKeyStore(KeyStore trustManagerKeyStore) { + this.trustManagerKeyStore = trustManagerKeyStore; + return this; + } + + public KeyStore getTrustManagerKeyStore() { + return trustManagerKeyStore; + } + public ClientConfig setHttpProxyHandler(HttpProxyHandler httpProxyHandler) { this.httpProxyHandler = httpProxyHandler; return this; @@ -432,6 +458,33 @@ public class ClientConfig { return httpProxyHandler; } + public ClientConfig setNodes(List nodes) { + this.nodes = nodes; + return this; + } + + public List getNodes() { + return nodes; + } + + public ClientConfig setNodeConnectionLimit(Integer nodeConnectionLimit) { + this.nodeConnectionLimit = nodeConnectionLimit; + return this; + } + + public Integer getNodeConnectionLimit() { + return nodeConnectionLimit; + } + + public ClientConfig setRetriesPerNode(Integer retriesPerNode) { + this.retriesPerNode = retriesPerNode; + return this; + } + + public Integer getRetriesPerNode() { + return retriesPerNode; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/org/xbib/netty/http/client/HttpAddress.java b/src/main/java/org/xbib/netty/http/client/HttpAddress.java index a3629d9..c7ec14c 100644 --- a/src/main/java/org/xbib/netty/http/client/HttpAddress.java +++ b/src/main/java/org/xbib/netty/http/client/HttpAddress.java @@ -2,13 +2,14 @@ package org.xbib.netty.http.client; import io.netty.handler.codec.http.HttpVersion; import org.xbib.net.URL; +import org.xbib.netty.http.client.pool.PoolKey; import java.net.InetSocketAddress; /** * A handle for host, port, HTTP version, secure transport flag of a channel for HTTP. */ -public class HttpAddress { +public class HttpAddress implements PoolKey { private static final HttpVersion HTTP_2_0 = HttpVersion.valueOf("HTTP/2.0"); diff --git a/src/main/java/org/xbib/netty/http/client/Request.java b/src/main/java/org/xbib/netty/http/client/Request.java index d4ab178..cfe76e2 100644 --- a/src/main/java/org/xbib/netty/http/client/Request.java +++ b/src/main/java/org/xbib/netty/http/client/Request.java @@ -8,9 +8,7 @@ import io.netty.handler.codec.http.cookie.Cookie; import org.xbib.net.URL; import org.xbib.netty.http.client.listener.CookieListener; -import org.xbib.netty.http.client.listener.ExceptionListener; import org.xbib.netty.http.client.listener.HttpHeadersListener; -import org.xbib.netty.http.client.listener.HttpPushListener; import org.xbib.netty.http.client.listener.HttpResponseListener; import java.nio.charset.StandardCharsets; @@ -45,14 +43,10 @@ public class Request { private HttpResponseListener responseListener; - private ExceptionListener exceptionListener; - private HttpHeadersListener headersListener; private CookieListener cookieListener; - private HttpPushListener pushListener; - Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod, HttpHeaders headers, Collection cookies, String uri, ByteBuf content, @@ -120,12 +114,13 @@ public class Request { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("base=").append(base).append(',') - .append("version=").append(httpVersion).append(',') - .append("method=").append(httpMethod).append(',') - .append("relativeUri=").append(uri).append(',') - .append("headers=").append(headers).append(',') - .append("content=").append(content != null ? content.copy(0,16).toString(StandardCharsets.UTF_8) : ""); + sb.append("Request[base='").append(base) + .append("',version=").append(httpVersion) + .append(",method=").append(httpMethod) + .append(",uri=").append(uri) + .append(",headers=").append(headers.entries()) + .append(",content=").append(content != null ? content.copy(0,16).toString(StandardCharsets.UTF_8) : "") + .append("]"); return sb.toString(); } @@ -156,24 +151,6 @@ public class Request { return responseListener; } - public Request setExceptionListener(ExceptionListener exceptionListener) { - this.exceptionListener = exceptionListener; - return this; - } - - public ExceptionListener getExceptionListener() { - return exceptionListener; - } - - public Request setPushListener(HttpPushListener httpPushListener) { - this.pushListener = httpPushListener; - return this; - } - - public HttpPushListener getPushListener() { - return pushListener; - } - public static RequestBuilder get() { return builder(HttpMethod.GET); } diff --git a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java index c4ec99b..a895b8e 100644 --- a/src/main/java/org/xbib/netty/http/client/RequestBuilder.java +++ b/src/main/java/org/xbib/netty/http/client/RequestBuilder.java @@ -13,6 +13,7 @@ import io.netty.handler.codec.http.QueryStringEncoder; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.util.AsciiString; +import org.xbib.net.QueryParameters; import org.xbib.net.URL; import org.xbib.net.URLSyntaxException; @@ -67,7 +68,9 @@ public class RequestBuilder { private URL url; - private QueryStringEncoder queryStringEncoder; + private String uri; + + private QueryParameters queryParameters; private ByteBuf content; @@ -90,6 +93,7 @@ public class RequestBuilder { headers = new DefaultHttpHeaders(); removeHeaders = new ArrayList<>(); cookies = new HashSet<>(); + queryParameters = new QueryParameters(); } public RequestBuilder setMethod(HttpMethod httpMethod) { @@ -97,12 +101,12 @@ public class RequestBuilder { return this; } - public RequestBuilder setHttp1() { + public RequestBuilder enableHttp1() { this.httpVersion = HttpVersion.HTTP_1_1; return this; } - public RequestBuilder setHttp2() { + public RequestBuilder enableHttp2() { this.httpVersion = HTTP_2_0; return this; } @@ -122,32 +126,27 @@ public class RequestBuilder { return this; } - public RequestBuilder setURL(String url) { - return setURL(URL.from(url)); - } - - public RequestBuilder setURL(URL url) { - this.url = url; - QueryStringDecoder queryStringDecoder = new QueryStringDecoder(URI.create(url.toString()), StandardCharsets.UTF_8); - this.queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path()); - for (Map.Entry> entry : queryStringDecoder.parameters().entrySet()) { - for (String value : entry.getValue()) { - queryStringEncoder.addParam(entry.getKey(), value); - } - } + public RequestBuilder remoteAddress(HttpAddress httpAddress) { + this.url = URL.builder() + .scheme(httpAddress.isSecure() ? "https" : "http") + .host(httpAddress.getInetSocketAddress().getHostString()) + .port(httpAddress.getInetSocketAddress().getPort()) + .build(); + this.httpVersion = httpAddress.getVersion(); return this; } - public RequestBuilder path(String path) { - if (this.url != null) { - try { - setURL(URL.base(url).resolve(path).toString()); - } catch (URLSyntaxException e) { - throw new IllegalArgumentException(e); - } - } else { - setURL(path); - } + public RequestBuilder url(String url) { + return url(URL.from(url)); + } + + public RequestBuilder url(URL url) { + this.url = url; + return this; + } + + public RequestBuilder uri(String uri) { + this.uri = uri; return this; } @@ -171,9 +170,9 @@ public class RequestBuilder { return this; } - public RequestBuilder addParam(String name, String value) { - if (queryStringEncoder != null) { - queryStringEncoder.addParam(name, value); + public RequestBuilder addParameter(String name, String value) { + if (queryParameters != null) { + queryParameters.add(name, value); } return this; } @@ -213,11 +212,6 @@ public class RequestBuilder { return this; } - public RequestBuilder setContent(ByteBuf byteBuf) { - this.content = byteBuf; - return this; - } - public RequestBuilder text(String text) { content(text, HttpHeaderValues.TEXT_PLAIN); return this; @@ -233,6 +227,11 @@ public class RequestBuilder { return this; } + public RequestBuilder content(ByteBuf byteBuf) { + this.content = byteBuf; + return this; + } + public RequestBuilder content(CharSequence charSequence, String contentType) { content(charSequence.toString().getBytes(StandardCharsets.UTF_8), AsciiString.of(contentType)); return this; @@ -253,8 +252,38 @@ public class RequestBuilder { throw new IllegalStateException("URL not set"); } if (url.getHost() == null) { - throw new IllegalStateException("URL host not set: " + url); + throw new IllegalStateException("host in URL not defined: " + url); } + if (uri != null) { + if (this.url != null) { + try { + url = URL.base(url).resolve(uri); + } catch (URLSyntaxException e) { + throw new IllegalArgumentException(e); + } + } else { + url(uri); + } + } + // add explicit parameters to URL + queryParameters.forEach(param -> url.getQueryParams().add(param)); + // let Netty's query string decoder/encoder work over the URL to add paramters given implicitly in url() + QueryStringDecoder queryStringDecoder = new QueryStringDecoder(URI.create(url.toString()), StandardCharsets.UTF_8); + QueryStringEncoder queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path()); + for (Map.Entry> entry : queryStringDecoder.parameters().entrySet()) { + for (String value : entry.getValue()) { + queryStringEncoder.addParam(entry.getKey(), value); + } + } + // build uri from QueryStringDecoder + StringBuilder sb = new StringBuilder(); + String pathAndQuery = queryStringEncoder.toString(); + sb.append(pathAndQuery.isEmpty() ? "/" : pathAndQuery); + String ref = url.getFragment(); + if (ref != null && !ref.isEmpty()) { + sb.append('#').append(ref); + } + String uri = sb.toString(); DefaultHttpHeaders validatedHeaders = new DefaultHttpHeaders(true); validatedHeaders.set(headers); String scheme = url.getScheme(); @@ -290,23 +319,10 @@ public class RequestBuilder { for (String headerName : removeHeaders) { validatedHeaders.remove(headerName); } - // create origin form from query string encoder - String uri = toOriginForm(); return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, uri, content, timeout, followRedirect, maxRedirects, 0); } - private String toOriginForm() { - StringBuilder sb = new StringBuilder(); - String pathAndQuery = queryStringEncoder.toString(); - sb.append(pathAndQuery.isEmpty() ? "/" : pathAndQuery); - String ref = url.getFragment(); - if (ref != null && !ref.isEmpty()) { - sb.append('#').append(ref); - } - return sb.toString(); - } - private void addHeader(AsciiString name, Object value) { if (!headers.contains(name)) { headers.add(name, value); diff --git a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java index 975298b..850d17d 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http1/HttpChannelInitializer.java @@ -9,7 +9,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedWriteHandler; import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.handler.TrafficLoggingHandler; @@ -51,11 +50,15 @@ public class HttpChannelInitializer extends ChannelInitializer { try { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() .sslProvider(clientConfig.getSslProvider()) - .sslContextProvider(clientConfig.getSslContextProvider()) .keyManager(clientConfig.getKeyCertChainInputStream(), clientConfig.getKeyInputStream(), clientConfig.getKeyPassword()) - .ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter()) - .trustManager(clientConfig.getTrustManagerFactory()); + .ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter()); + if (clientConfig.getSslContextProvider() != null) { + sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider()); + } + if (clientConfig.getTrustManagerFactory() != null) { + sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory()); + } SslContext sslContext = sslContextBuilder.build(); SslHandler sslHandler = sslContext.newHandler(ch.alloc()); SSLEngine engine = sslHandler.engine(); diff --git a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java index 4d5ec95..c4493aa 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java @@ -4,7 +4,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; +import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2SecurityUtil; @@ -18,7 +18,6 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SupportedCipherSuiteFilter; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.HttpAddress; @@ -60,10 +59,10 @@ public class Http2ChannelInitializer extends ChannelInitializer { */ @Override protected void initChannel(SocketChannel ch) { - DefaultHttp2Connection http2Connection = new DefaultHttp2Connection(false); + Http2Connection http2Connection = new DefaultHttp2Connection(false); HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder() .connection(http2Connection) - .frameListener(new DelegatingDecompressorFrameListener(http2Connection, + .frameListener(new Http2PushPromiseHandler(http2Connection, new InboundHttp2ToHttpAdapterBuilder(http2Connection) .maxContentLength(clientConfig.getMaxContentLength()) .propagateSettings(true) @@ -75,7 +74,6 @@ public class Http2ChannelInitializer extends ChannelInitializer { try { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient() .sslProvider(clientConfig.getSslProvider()) - .trustManager(InsecureTrustManagerFactory.INSTANCE) .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) .applicationProtocolConfig(new ApplicationProtocolConfig( ApplicationProtocolConfig.Protocol.ALPN, @@ -85,6 +83,9 @@ public class Http2ChannelInitializer extends ChannelInitializer { if (clientConfig.getSslContextProvider() != null) { sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider()); } + if (clientConfig.getTrustManagerFactory() != null) { + sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory()); + } SslContext sslContext = sslContextBuilder.build(); SslHandler sslHandler = sslContext.newHandler(ch.alloc()); SSLEngine engine = sslHandler.engine(); diff --git a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2PushPromiseHandler.java b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2PushPromiseHandler.java new file mode 100644 index 0000000..10f98bf --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2PushPromiseHandler.java @@ -0,0 +1,24 @@ +package org.xbib.netty.http.client.handler.http2; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameListener; +import io.netty.handler.codec.http2.Http2Headers; +import org.xbib.netty.http.client.transport.Transport; + +public class Http2PushPromiseHandler extends DelegatingDecompressorFrameListener { + + public Http2PushPromiseHandler(Http2Connection connection, Http2FrameListener listener) { + super(connection, listener); + } + + @Override + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, + Http2Headers headers, int padding) throws Http2Exception { + super.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding); + Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get(); + transport.pushPromiseReceived(streamId, promisedStreamId, headers); + } +} diff --git a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ResponseHandler.java b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ResponseHandler.java index 5f36c50..412b776 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ResponseHandler.java +++ b/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ResponseHandler.java @@ -20,11 +20,6 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler NODE_ATTRIBUTE_KEY = AttributeKey.valueOf("node"); - - void prepare(int count) throws ConnectException; - - Channel lease() throws ConnectException; - - int lease(List channels, int maxCount) throws ConnectException; - - void release(Channel channel); - - void release(List channels); -} diff --git a/src/main/java/org/xbib/netty/http/client/pool/Pool.java b/src/main/java/org/xbib/netty/http/client/pool/Pool.java new file mode 100644 index 0000000..041759c --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/pool/Pool.java @@ -0,0 +1,17 @@ +package org.xbib.netty.http.client.pool; + +import java.io.Closeable; +import java.util.List; + +public interface Pool extends Closeable { + + void prepare(int count) throws Exception; + + T acquire() throws Exception; + + int acquire(List list, int maxCount) throws Exception; + + void release(T t) throws Exception; + + void release(List list) throws Exception; +} diff --git a/src/main/java/org/xbib/netty/http/client/pool/PoolKey.java b/src/main/java/org/xbib/netty/http/client/pool/PoolKey.java new file mode 100644 index 0000000..7f4a18d --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/pool/PoolKey.java @@ -0,0 +1,8 @@ +package org.xbib.netty.http.client.pool; + +import java.net.InetSocketAddress; + +public interface PoolKey { + + InetSocketAddress getInetSocketAddress(); +} diff --git a/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java b/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java index b3f7860..26cb3ed 100644 --- a/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java +++ b/src/main/java/org/xbib/netty/http/client/pool/SimpleChannelPool.java @@ -6,9 +6,9 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.util.AttributeKey; import java.net.ConnectException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -19,75 +19,76 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; import java.util.logging.Logger; - -public class SimpleChannelPool implements ChannelPool { +public class SimpleChannelPool implements Pool { private static final Logger logger = Logger.getLogger(SimpleChannelPool.class.getName()); private final Semaphore semaphore; - private final List nodes; + private final ChannelPoolHandler channelPoolhandler; + + private final List nodes; private final int numberOfNodes; private final int retriesPerNode; - private final Map bootstraps; + private final Map bootstraps; - private final Map> channels; + private final Map> channels; - private final Map> availableChannels; + private final Map> availableChannels; - private final Map counts; + private final Map counts; - private final Map failedCounts; + private final Map failedCounts; - private final Lock lock = new ReentrantLock(); + private final Lock lock; + + private final AttributeKey attributeKey; /** - * @param semaphore the throttle for the concurrency level control + * @param semaphore the concurrency level * @param nodes the endpoint nodes, any element may contain the port (followed after ":") * to override the defaultPort argument * @param bootstrap bootstrap instance * @param channelPoolHandler channel pool handler being notified upon new connection is created - * @param defaultPort default port used to connect (any node address from the nodes set may override this) * @param retriesPerNode the max count of the subsequent connection failures to the node before - * the node will be excluded from the pool, 0 means no limit + * the node will be excluded from the pool. If set to 0, the value is ignored. */ - public SimpleChannelPool(Semaphore semaphore, List nodes, Bootstrap bootstrap, - ChannelPoolHandler channelPoolHandler, int defaultPort, int retriesPerNode) { + public SimpleChannelPool(Semaphore semaphore, List nodes, Bootstrap bootstrap, + ChannelPoolHandler channelPoolHandler, int retriesPerNode) { this.semaphore = semaphore; + this.channelPoolhandler = channelPoolHandler; + this.nodes = nodes; + this.retriesPerNode = retriesPerNode; + this.lock = new ReentrantLock(); + this.attributeKey = AttributeKey.valueOf("poolKey"); if (nodes == null || nodes.isEmpty()) { throw new IllegalArgumentException("empty nodes array argument"); } - this.nodes = nodes; - this.retriesPerNode = retriesPerNode; this.numberOfNodes = nodes.size(); bootstraps = new HashMap<>(numberOfNodes); channels = new HashMap<>(numberOfNodes); availableChannels = new HashMap<>(numberOfNodes); counts = new HashMap<>(numberOfNodes); failedCounts = new HashMap<>(numberOfNodes); - for (String node : nodes) { - InetSocketAddress nodeAddr; - if (node.contains(":")) { - String addrParts[] = node.split(":"); - nodeAddr = new InetSocketAddress(addrParts[0], Integer.parseInt(addrParts[1])); - } else { - nodeAddr = new InetSocketAddress(node, defaultPort); - } - bootstraps.put(node, bootstrap.clone().remoteAddress(nodeAddr) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel conn) throws Exception { - if(!conn.eventLoop().inEventLoop()) { - throw new AssertionError(); - } - channelPoolHandler.channelCreated(conn); - } - })); + for (K node : nodes) { + bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress()) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + if(!channel.eventLoop().inEventLoop()) { + throw new IllegalStateException(); + } + if (channelPoolHandler != null) { + channelPoolHandler.channelCreated(channel); + } + } + })); availableChannels.put(node, new ConcurrentLinkedQueue<>()); counts.put(node, 0); failedCounts.put(node, 0); @@ -99,10 +100,10 @@ public class SimpleChannelPool implements ChannelPool { if (count > 0) { for (int i = 0; i < count; i ++) { Channel channel = connectToAnyNode(); - if(channel == null) { - throw new ConnectException("Failed to pre-create the connections to the target nodes"); + if (channel == null) { + throw new ConnectException("failed to prepare the connections"); } - String nodeAddr = channel.attr(NODE_ATTRIBUTE_KEY).get(); + K nodeAddr = channel.attr(attributeKey).get(); if (channel.isActive()) { Queue channelQueue = availableChannels.get(nodeAddr); if (channelQueue != null) { @@ -112,49 +113,117 @@ public class SimpleChannelPool implements ChannelPool { channel.close(); } } - logger.info("prepared " + count + " connections"); + logger.log(Level.FINE,"prepared " + count + " connections"); } else { throw new IllegalArgumentException("Connection count should be > 0, but got " + count); } } - private class CloseChannelListener implements ChannelFutureListener { - - private final String nodeAddr; - private final Channel conn; - - private CloseChannelListener(String nodeAddr, Channel conn) { - this.nodeAddr = nodeAddr; - this.conn = conn; - } - - @Override - public void operationComplete(ChannelFuture future) { - logger.fine("connection to " + nodeAddr + " closed"); - lock.lock(); - try { - synchronized (counts) { - if(counts.containsKey(nodeAddr)) { - counts.put(nodeAddr, counts.get(nodeAddr) - 1); - } - } - synchronized (channels) { - List nodeConns = channels.get(nodeAddr); - if(nodeConns != null) { - nodeConns.remove(conn); - } - } - semaphore.release(); - } finally { - lock.unlock(); + @Override + public Channel acquire() throws Exception { + Channel channel = null; + if (semaphore.tryAcquire()) { + if ((channel = poll()) == null) { + channel = connectToAnyNode(); } + if (channel == null) { + semaphore.release(); + throw new ConnectException(); + } + } + if (channelPoolhandler != null) { + channelPoolhandler.channelAcquired(channel); + } + return channel; + } + + @Override + public int acquire(List channels, int maxCount) throws Exception { + int availableCount = semaphore.drainPermits(); + if (availableCount == 0) { + return availableCount; + } + if (availableCount > maxCount) { + semaphore.release(availableCount - maxCount); + availableCount = maxCount; + } + Channel channel; + for (int i = 0; i < availableCount; i ++) { + if (null == (channel = poll())) { + channel = connectToAnyNode(); + } + if (channel == null) { + semaphore.release(availableCount - i); + throw new ConnectException(); + } else { + if (channelPoolhandler != null) { + channelPoolhandler.channelAcquired(channel); + } + channels.add(channel); + } + } + return availableCount; + } + + @Override + public void release(Channel channel) throws Exception { + K nodeAddr = channel.attr(attributeKey).get(); + if (channel.isActive()) { + Queue channelQueue = availableChannels.get(nodeAddr); + if (channelQueue != null) { + channelQueue.add(channel); + } + semaphore.release(); + } else { + channel.close(); + } + if (channelPoolhandler != null) { + channelPoolhandler.channelReleased(channel); + } + } + + @Override + public void release(List channels) throws Exception { + for (Channel channel : channels) { + release(channel); + } + } + + @Override + public void close() { + lock.lock(); + try { + int closedConnCount = 0; + for (K nodeAddr : availableChannels.keySet()) { + for (Channel conn : availableChannels.get(nodeAddr)) { + if (conn.isOpen()) { + conn.close(); + closedConnCount++; + } + } + } + availableChannels.clear(); + for (K nodeAddr : channels.keySet()) { + for (Channel channel : channels.get(nodeAddr)) { + if (channel != null && channel.isOpen()) { + channel.close(); + closedConnCount++; + } + } + } + channels.clear(); + bootstraps.clear(); + counts.clear(); + logger.log(Level.FINE, "closed " + closedConnCount + " connections"); + } finally { + lock.unlock(); } } private Channel connectToAnyNode() throws ConnectException { Channel channel = null; - String nodeAddr = null; - String nextNodeAddr; + K nodeAddr = null; + K nextNodeAddr; int min = Integer.MAX_VALUE; int next; int i = ThreadLocalRandom.current().nextInt(numberOfNodes); @@ -170,28 +239,28 @@ public class SimpleChannelPool implements ChannelPool { } } if (nodeAddr != null) { - logger.fine("trying connection to " + nodeAddr); + logger.log(Level.FINE, "trying connection to " + nodeAddr); try { channel = connect(nodeAddr); } catch (Exception e) { - logger.warning("failed to create a new connection to " + nodeAddr + ": " + e.toString()); + logger.log(Level.WARNING, "failed to create a new connection to " + nodeAddr + ": " + e.toString()); if (retriesPerNode > 0) { int selectedNodeFailedConnAttemptsCount = failedCounts.get(nodeAddr) + 1; failedCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount); if (selectedNodeFailedConnAttemptsCount > retriesPerNode) { - logger.warning("Failed to connect to the node \"" + nodeAddr + "\" " - + selectedNodeFailedConnAttemptsCount + " times successively, " - + "excluding the node from the connection pool forever"); + logger.log(Level.WARNING, "failed to connect to the node " + nodeAddr + " " + + selectedNodeFailedConnAttemptsCount + " times, " + + "excluding the node from the connection pool"); counts.put(nodeAddr, Integer.MAX_VALUE); boolean allNodesExcluded = true; - for (String node : nodes) { + for (K node : nodes) { if (counts.get(node) < Integer.MAX_VALUE) { allNodesExcluded = false; break; } } if (allNodesExcluded) { - logger.severe("no endpoint nodes left in the connection pool"); + logger.log(Level.SEVERE, "no nodes left in the connection pool"); } } } @@ -204,20 +273,20 @@ public class SimpleChannelPool implements ChannelPool { } if (channel != null) { channel.closeFuture().addListener(new CloseChannelListener(nodeAddr, channel)); - channel.attr(NODE_ATTRIBUTE_KEY).set(nodeAddr); - channels.computeIfAbsent(nodeAddr, na -> new ArrayList<>()).add(channel); - synchronized(counts) { + channel.attr(attributeKey).set(nodeAddr); + channels.computeIfAbsent(nodeAddr, node -> new ArrayList<>()).add(channel); + synchronized (counts) { counts.put(nodeAddr, counts.get(nodeAddr) + 1); } if(retriesPerNode > 0) { failedCounts.put(nodeAddr, 0); } - logger.fine("new connection to " + nodeAddr + " created"); + logger.log(Level.FINE,"new connection to " + nodeAddr + " created"); } return channel; } - protected Channel connect(String addr) throws Exception { + private Channel connect(K addr) throws Exception { Bootstrap bootstrap = bootstraps.get(addr); if (bootstrap != null) { return bootstrap.connect().sync().channel(); @@ -231,9 +300,9 @@ public class SimpleChannelPool implements ChannelPool { Channel channel; for(int j = i; j < i + numberOfNodes; j ++) { channelQueue = availableChannels.get(nodes.get(j % numberOfNodes)); - if(channelQueue != null) { + if (channelQueue != null) { channel = channelQueue.poll(); - if(channel != null && channel.isActive()) { + if (channel != null && channel.isActive()) { return channel; } } @@ -241,100 +310,36 @@ public class SimpleChannelPool implements ChannelPool { return null; } - @Override - public Channel lease() throws ConnectException { - Channel conn = null; - if (semaphore.tryAcquire()) { - if (null == (conn = poll())) { - conn = connectToAnyNode(); - } - if (conn == null) { - semaphore.release(); - throw new ConnectException(); - } - } - return conn; - } + private class CloseChannelListener implements ChannelFutureListener { - @Override - public int lease(List channels, int maxCount) throws ConnectException { - int availableCount = semaphore.drainPermits(); - if (availableCount == 0) { - return availableCount; - } - if (availableCount > maxCount) { - semaphore.release(availableCount - maxCount); - availableCount = maxCount; - } - Channel conn; - for (int i = 0; i < availableCount; i ++) { - if (null == (conn = poll())) { - conn = connectToAnyNode(); - } - if (conn == null) { - semaphore.release(availableCount - i); - throw new ConnectException(); - } else { - channels.add(conn); - } - } - return availableCount; - } + private final K nodeAddr; + private final Channel channel; - @Override - public void release(Channel conn) { - String nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get(); - if( conn.isActive()) { - Queue connQueue = availableChannels.get(nodeAddr); - if (connQueue != null) { - connQueue.add(conn); - } - semaphore.release(); - } else { - conn.close(); + private CloseChannelListener(K nodeAddr, Channel channel) { + this.nodeAddr = nodeAddr; + this.channel = channel; } - } - @Override - public void release(List conns) { - String nodeAddr; - Queue connQueue; - for (Channel conn : conns) { - nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get(); - if (conn.isActive()) { - connQueue = availableChannels.get(nodeAddr); - connQueue.add(conn); - semaphore.release(); - } else { - conn.close(); - } - } - } - - @Override - public void close() { - lock.lock(); - int closedConnCount = 0; - for (String nodeAddr: availableChannels.keySet()) { - for (Channel conn: availableChannels.get(nodeAddr)) { - if (conn.isOpen()) { - conn.close(); - closedConnCount ++; + @Override + public void operationComplete(ChannelFuture future) { + logger.log(Level.FINE,"connection to " + nodeAddr + " closed"); + lock.lock(); + try { + synchronized (counts) { + if (counts.containsKey(nodeAddr)) { + counts.put(nodeAddr, counts.get(nodeAddr) - 1); + } } - } - } - availableChannels.clear(); - for (String nodeAddr: channels.keySet()) { - for (Channel conn: channels.get(nodeAddr)) { - if (conn.isOpen()) { - conn.close(); - closedConnCount ++; + synchronized (channels) { + List channels = SimpleChannelPool.this.channels.get(nodeAddr); + if (channels != null) { + channels.remove(channel); + } } + semaphore.release(); + } finally { + lock.unlock(); } } - channels.clear(); - bootstraps.clear(); - counts.clear(); - logger.fine("closed " + closedConnCount + " connections"); } } diff --git a/src/main/java/org/xbib/netty/http/client/rest/RestClient.java b/src/main/java/org/xbib/netty/http/client/rest/RestClient.java index 90ed4a8..a25870f 100644 --- a/src/main/java/org/xbib/netty/http/client/rest/RestClient.java +++ b/src/main/java/org/xbib/netty/http/client/rest/RestClient.java @@ -11,7 +11,6 @@ import org.xbib.netty.http.client.RequestBuilder; import org.xbib.netty.http.client.transport.Transport; import java.io.IOException; -import java.net.ConnectException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.logging.Logger; @@ -55,21 +54,14 @@ public class RestClient { Client client = new Client(); Transport transport = client.newTransport(HttpAddress.http1(url)); RestClient restClient = new RestClient(client, transport); - transport.setResponseListener(restClient::setResponse); - try { - transport.connect(); - } catch (InterruptedException e) { - throw new ConnectException("unable to connect to " + url); - } - transport.awaitSettings(); RequestBuilder requestBuilder = Request.builder(httpMethod); - requestBuilder.setURL(url); + requestBuilder.url(url); if (body != null && charset != null) { ByteBuf byteBuf = client.getByteBufAllocator().buffer(); byteBuf.writeCharSequence(body, charset); - requestBuilder.setContent(byteBuf); + requestBuilder.content(byteBuf); } - transport.execute(requestBuilder.build()).get(); + transport.execute(requestBuilder.build().setResponseListener(restClient::setResponse)).get(); return restClient; } diff --git a/src/main/java/org/xbib/netty/http/client/retry/BackOff.java b/src/main/java/org/xbib/netty/http/client/retry/BackOff.java new file mode 100644 index 0000000..4025363 --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/retry/BackOff.java @@ -0,0 +1,65 @@ +package org.xbib.netty.http.client.retry; + +import java.io.IOException; + +/** + * Back-off policy when retrying an operation. + */ +public interface BackOff { + + /** + * Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */ + long STOP = -1L; + + /** + * Reset to initial state. + */ + void reset() throws IOException; + + /** + * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to + * indicate that no retries should be made. + * + *

+ * Example usage: + *

+ * + *
+     long backOffMillis = backoff.nextBackOffMillis();
+     if (backOffMillis == Backoff.STOP) {
+     // do not retry operation
+     } else {
+     // sleep for backOffMillis milliseconds and retry operation
+     }
+     * 
+ */ + long nextBackOffMillis() throws IOException; + + /** + * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried + * immediately without waiting. + */ + BackOff ZERO_BACKOFF = new BackOff() { + + public void reset() { + } + + public long nextBackOffMillis() { + return 0; + } + }; + + /** + * Fixed back-off policy that always returns {@code #STOP} for {@link #nextBackOffMillis()}, + * meaning that the operation should not be retried. + */ + BackOff STOP_BACKOFF = new BackOff() { + + public void reset() { + } + + public long nextBackOffMillis() { + return STOP; + } + }; +} diff --git a/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java b/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java new file mode 100644 index 0000000..dd0638a --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/retry/ExponentialBackOff.java @@ -0,0 +1,487 @@ +package org.xbib.netty.http.client.retry; + +/** + * Implementation of {@link BackOff} that increases the back off period for each retry attempt using + * a randomization function that grows exponentially. + * + *

+ * {@link #nextBackOffMillis()} is calculated using the following formula: + *

+ * + *
+ randomized_interval =
+ retry_interval * (random value in range [1 - randomization_factor, 1 + randomization_factor])
+ * 
+ * + *

+ * In other words {@link #nextBackOffMillis()} will range between the randomization factor + * percentage below and above the retry interval. For example, using 2 seconds as the base retry + * interval and 0.5 as the randomization factor, the actual back off period used in the next retry + * attempt will be between 1 and 3 seconds. + *

+ * + *

+ * Note: max_interval caps the retry_interval and not the randomized_interval. + *

+ * + *

+ * If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the + * max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning + * {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}. + *

+ * + *

+ * Example: The default retry_interval is .5 seconds, default randomization_factor is 0.5, default + * multiplier is 1.5 and the default max_interval is 1 minute. For 10 tries the sequence will be + * (values in seconds) and assuming we go over the max_elapsed_time on the 10th try: + *

+ * + *
+ request#     retry_interval     randomized_interval
+
+ 1             0.5                [0.25,   0.75]
+ 2             0.75               [0.375,  1.125]
+ 3             1.125              [0.562,  1.687]
+ 4             1.687              [0.8435, 2.53]
+ 5             2.53               [1.265,  3.795]
+ 6             3.795              [1.897,  5.692]
+ 7             5.692              [2.846,  8.538]
+ 8             8.538              [4.269, 12.807]
+ 9            12.807              [6.403, 19.210]
+ 10           19.210              {@link BackOff#STOP}
+ * 
+ * + *

+ * Implementation is not thread-safe. + *

+ */ +public class ExponentialBackOff implements BackOff { + + /** The default initial interval value in milliseconds (0.5 seconds). */ + public static final int DEFAULT_INITIAL_INTERVAL_MILLIS = 500; + + /** + * The default randomization factor (0.5 which results in a random period ranging between 50% + * below and 50% above the retry interval). + */ + public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; + + /** The default multiplier value (1.5 which is 50% increase per back off). */ + public static final double DEFAULT_MULTIPLIER = 1.5; + + /** The default maximum back off time in milliseconds (1 minute). */ + public static final int DEFAULT_MAX_INTERVAL_MILLIS = 60000; + + /** The default maximum elapsed time in milliseconds (15 minutes). */ + public static final int DEFAULT_MAX_ELAPSED_TIME_MILLIS = 900000; + + /** The current retry interval in milliseconds. */ + private int currentIntervalMillis; + + /** The initial retry interval in milliseconds. */ + private final int initialIntervalMillis; + + /** + * The randomization factor to use for creating a range around the retry interval. + * + *

+ * A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + * above the retry interval. + *

+ */ + private final double randomizationFactor; + + /** The value to multiply the current interval with for each retry attempt. */ + private final double multiplier; + + /** + * The maximum value of the back off period in milliseconds. Once the retry interval reaches this + * value it stops increasing. + */ + private final int maxIntervalMillis; + + /** + * The system time in nanoseconds. It is calculated when an ExponentialBackOffPolicy instance is + * created and is reset when {@link #reset()} is called. + */ + private long startTimeNanos; + + /** + * The maximum elapsed time after instantiating {@link ExponentialBackOff} or calling + * {@link #reset()} after which {@link #nextBackOffMillis()} returns {@link BackOff#STOP}. + */ + private final int maxElapsedTimeMillis; + + /** Nano clock. */ + private final NanoClock nanoClock; + + /** + * Creates an instance of ExponentialBackOffPolicy using default values. + * + *

+ * To override the defaults use {@link Builder}. + *

+ * + *
    + *
  • {@code initialIntervalMillis} defaults to {@link #DEFAULT_INITIAL_INTERVAL_MILLIS}
  • + *
  • {@code randomizationFactor} defaults to {@link #DEFAULT_RANDOMIZATION_FACTOR}
  • + *
  • {@code multiplier} defaults to {@link #DEFAULT_MULTIPLIER}
  • + *
  • {@code maxIntervalMillis} defaults to {@link #DEFAULT_MAX_INTERVAL_MILLIS}
  • + *
  • {@code maxElapsedTimeMillis} defaults in {@link #DEFAULT_MAX_ELAPSED_TIME_MILLIS}
  • + *
+ */ + public ExponentialBackOff() { + this(new Builder()); + } + + /** + * @param builder builder + */ + protected ExponentialBackOff(Builder builder) { + initialIntervalMillis = builder.initialIntervalMillis; + randomizationFactor = builder.randomizationFactor; + multiplier = builder.multiplier; + maxIntervalMillis = builder.maxIntervalMillis; + maxElapsedTimeMillis = builder.maxElapsedTimeMillis; + nanoClock = builder.nanoClock; + //Preconditions.checkArgument(initialIntervalMillis > 0); + //Preconditions.checkArgument(0 <= randomizationFactor && randomizationFactor < 1); + //Preconditions.checkArgument(multiplier >= 1); + //Preconditions.checkArgument(maxIntervalMillis >= initialIntervalMillis); + //Preconditions.checkArgument(maxElapsedTimeMillis > 0); + reset(); + } + + /** Sets the interval back to the initial retry interval and restarts the timer. */ + public final void reset() { + currentIntervalMillis = initialIntervalMillis; + startTimeNanos = nanoClock.nanoTime(); + } + + public void setStartTimeNanos(long startTimeNanos) { + this.startTimeNanos = startTimeNanos; + } + + /** + * {@inheritDoc} + * + *

+ * This method calculates the next back off interval using the formula: randomized_interval = + * retry_interval +/- (randomization_factor * retry_interval) + *

+ * + *

+ * Subclasses may override if a different algorithm is required. + *

+ */ + public long nextBackOffMillis() { + // Make sure we have not gone over the maximum elapsed time. + if (getElapsedTimeMillis() > maxElapsedTimeMillis) { + return STOP; + } + int randomizedInterval = + getRandomValueFromInterval(randomizationFactor, Math.random(), currentIntervalMillis); + incrementCurrentInterval(); + return randomizedInterval; + } + + /** + * Returns a random value from the interval [randomizationFactor * currentInterval, + * randomizationFactor * currentInterval]. + */ + public static int getRandomValueFromInterval(double randomizationFactor, double random, int currentIntervalMillis) { + double delta = randomizationFactor * currentIntervalMillis; + double minInterval = currentIntervalMillis - delta; + double maxInterval = currentIntervalMillis + delta; + // Get a random value from the range [minInterval, maxInterval]. + // The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then + // we want a 33% chance for selecting either 1, 2 or 3. + return (int) (minInterval + (random * (maxInterval - minInterval + 1))); + } + + /** Returns the initial retry interval in milliseconds. */ + public final int getInitialIntervalMillis() { + return initialIntervalMillis; + } + + /** + * Returns the randomization factor to use for creating a range around the retry interval. + * + *

+ * A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + * above the retry interval. + *

+ */ + public final double getRandomizationFactor() { + return randomizationFactor; + } + + /** + * Returns the current retry interval in milliseconds. + */ + public final int getCurrentIntervalMillis() { + return currentIntervalMillis; + } + + /** + * Returns the value to multiply the current interval with for each retry attempt. + */ + public final double getMultiplier() { + return multiplier; + } + + /** + * Returns the maximum value of the back off period in milliseconds. Once the current interval + * reaches this value it stops increasing. + */ + public final int getMaxIntervalMillis() { + return maxIntervalMillis; + } + + /** + * Returns the maximum elapsed time in milliseconds. + * + *

+ * If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the + * max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning + * {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}. + *

+ */ + public final int getMaxElapsedTimeMillis() { + return maxElapsedTimeMillis; + } + + /** + * Returns the elapsed time in milliseconds since an {@link ExponentialBackOff} instance is + * created and is reset when {@link #reset()} is called. + * + *

+ * The elapsed time is computed using {@link System#nanoTime()}. + *

+ */ + public final long getElapsedTimeMillis() { + return (nanoClock.nanoTime() - startTimeNanos) / 1000000; + } + + /** + * Increments the current interval by multiplying it with the multiplier. + */ + private void incrementCurrentInterval() { + // Check for overflow, if overflow is detected set the current interval to the max interval. + if (currentIntervalMillis >= maxIntervalMillis / multiplier) { + currentIntervalMillis = maxIntervalMillis; + } else { + currentIntervalMillis *= multiplier; + } + } + + /** + * Builder for {@link ExponentialBackOff}. + * + *

+ * Implementation is not thread-safe. + *

+ */ + public static class Builder { + + /** The initial retry interval in milliseconds. */ + int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS; + + /** + * The randomization factor to use for creating a range around the retry interval. + * + *

+ * A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + * above the retry interval. + *

+ */ + double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR; + + /** The value to multiply the current interval with for each retry attempt. */ + double multiplier = DEFAULT_MULTIPLIER; + + /** + * The maximum value of the back off period in milliseconds. Once the retry interval reaches + * this value it stops increasing. + */ + int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS; + + /** + * The maximum elapsed time in milliseconds after instantiating {@link ExponentialBackOff} or + * calling {@link #reset()} after which {@link #nextBackOffMillis()} returns + * {@link BackOff#STOP}. + */ + int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS; + + /** Nano clock. */ + NanoClock nanoClock = NanoClock.SYSTEM; + + public Builder() { + } + + /** Builds a new instance of {@link ExponentialBackOff}. */ + public ExponentialBackOff build() { + return new ExponentialBackOff(this); + } + + /** + * Returns the initial retry interval in milliseconds. The default value is + * {@link #DEFAULT_INITIAL_INTERVAL_MILLIS}. + */ + public final int getInitialIntervalMillis() { + return initialIntervalMillis; + } + + /** + * Sets the initial retry interval in milliseconds. The default value is + * {@link #DEFAULT_INITIAL_INTERVAL_MILLIS}. Must be {@code > 0}. + * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public Builder setInitialIntervalMillis(int initialIntervalMillis) { + this.initialIntervalMillis = initialIntervalMillis; + return this; + } + + /** + * Returns the randomization factor to use for creating a range around the retry interval. The + * default value is {@link #DEFAULT_RANDOMIZATION_FACTOR}. + * + *

+ * A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + * above the retry interval. + *

+ * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public final double getRandomizationFactor() { + return randomizationFactor; + } + + /** + * Sets the randomization factor to use for creating a range around the retry interval. The + * default value is {@link #DEFAULT_RANDOMIZATION_FACTOR}. Must fall in the range + * {@code 0 <= randomizationFactor < 1}. + * + *

+ * A randomization factor of 0.5 results in a random period ranging between 50% below and 50% + * above the retry interval. + *

+ * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public Builder setRandomizationFactor(double randomizationFactor) { + this.randomizationFactor = randomizationFactor; + return this; + } + + /** + * Returns the value to multiply the current interval with for each retry attempt. The default + * value is {@link #DEFAULT_MULTIPLIER}. + */ + public final double getMultiplier() { + return multiplier; + } + + /** + * Sets the value to multiply the current interval with for each retry attempt. The default + * value is {@link #DEFAULT_MULTIPLIER}. Must be {@code >= 1}. + * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public Builder setMultiplier(double multiplier) { + this.multiplier = multiplier; + return this; + } + + /** + * Returns the maximum value of the back off period in milliseconds. Once the current interval + * reaches this value it stops increasing. The default value is + * {@link #DEFAULT_MAX_INTERVAL_MILLIS}. Must be {@code >= initialInterval}. + */ + public final int getMaxIntervalMillis() { + return maxIntervalMillis; + } + + /** + * Sets the maximum value of the back off period in milliseconds. Once the current interval + * reaches this value it stops increasing. The default value is + * {@link #DEFAULT_MAX_INTERVAL_MILLIS}. + * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public Builder setMaxIntervalMillis(int maxIntervalMillis) { + this.maxIntervalMillis = maxIntervalMillis; + return this; + } + + /** + * Returns the maximum elapsed time in milliseconds. The default value is + * {@link #DEFAULT_MAX_ELAPSED_TIME_MILLIS}. + * + *

+ * If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the + * max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning + * {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}. + *

+ */ + public final int getMaxElapsedTimeMillis() { + return maxElapsedTimeMillis; + } + + /** + * Sets the maximum elapsed time in milliseconds. The default value is + * {@link #DEFAULT_MAX_ELAPSED_TIME_MILLIS}. Must be {@code > 0}. + * + *

+ * If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the + * max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning + * {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}. + *

+ * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public Builder setMaxElapsedTimeMillis(int maxElapsedTimeMillis) { + this.maxElapsedTimeMillis = maxElapsedTimeMillis; + return this; + } + + /** + * Returns the nano clock. + */ + public final NanoClock getNanoClock() { + return nanoClock; + } + + /** + * Sets the nano clock ({@link NanoClock#SYSTEM} by default). + * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public Builder setNanoClock(NanoClock nanoClock) { + this.nanoClock = nanoClock; //Preconditions.checkNotNull(nanoClock); + return this; + } + } +} diff --git a/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java b/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java new file mode 100644 index 0000000..11e8e13 --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/retry/NanoClock.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.xbib.netty.http.client.retry; + +/** + * Nano clock which can be used to measure elapsed time in nanoseconds. + * + *

+ * The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations + * may be used for testing. + *

+ * + * @since 1.14 + * @author Yaniv Inbar + */ +public interface NanoClock { + + /** + * Returns the current value of the most precise available system timer, in nanoseconds for use to + * measure elapsed time, to match the behavior of {@link System#nanoTime()}. + */ + long nanoTime(); + + /** + * Provides the default System implementation of a nano clock by using {@link System#nanoTime()}. + */ + NanoClock SYSTEM = new NanoClock() { + public long nanoTime() { + return System.nanoTime(); + } + }; +} diff --git a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java index 76be0d4..617afdc 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java @@ -16,12 +16,9 @@ import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.RequestBuilder; -import org.xbib.netty.http.client.listener.CookieListener; -import org.xbib.netty.http.client.listener.ExceptionListener; -import org.xbib.netty.http.client.listener.HttpHeadersListener; -import org.xbib.netty.http.client.listener.HttpPushListener; -import org.xbib.netty.http.client.listener.HttpResponseListener; +import java.io.IOException; +import java.net.ConnectException; import java.nio.charset.MalformedInputException; import java.nio.charset.StandardCharsets; import java.nio.charset.UnmappableCharacterException; @@ -51,16 +48,6 @@ abstract class BaseTransport implements Transport { protected SortedMap requests; - protected HttpResponseListener responseListener; - - protected ExceptionListener exceptionListener; - - protected HttpHeadersListener httpHeadersListener; - - protected CookieListener cookieListener; - - protected HttpPushListener pushListener; - private Map cookieBox; BaseTransport(Client client, HttpAddress httpAddress) { @@ -75,31 +62,8 @@ abstract class BaseTransport implements Transport { } @Override - public void connect() throws InterruptedException { - channel = client.newChannel(httpAddress); - channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this); - } - - @Override - public Channel channel() { - return channel; - } - - @Override - public Transport execute(Request request) { - if (channel == null) { - try { - connect(); - awaitSettings(); - } catch (InterruptedException e) { - return this; - } - } - setResponseListener(request.getResponseListener()); - setExceptionListener(request.getExceptionListener()); - setHeadersListener(request.getHeadersListener()); - setCookieListener(request.getCookieListener()); - setPushListener(request.getPushListener()); + public Transport execute(Request request) throws IOException { + ensureConnect(); // some HTTP 1.1 servers like Elasticsearch do not understand full URIs in HTTP command line String uri = request.httpVersion().majorVersion() < 2 ? request.base().relativeReference() : request.base().toString(); @@ -136,91 +100,45 @@ abstract class BaseTransport implements Transport { */ @Override public CompletableFuture execute(Request request, - Function supplier) { + Function supplier) throws IOException { final CompletableFuture completableFuture = new CompletableFuture<>(); - request.setExceptionListener(completableFuture::completeExceptionally); + //request.setExceptionListener(completableFuture::completeExceptionally); request.setResponseListener(response -> completableFuture.complete(supplier.apply(response))); execute(request); return completableFuture; } @Override - public void close() { + public synchronized void close() { get(); if (channel != null) { channel.close(); + channel = null; } } - @Override - public void setResponseListener(HttpResponseListener responseListener) { - if (responseListener != null) { - this.responseListener = responseListener; + protected void ensureConnect() throws IOException { + if (channel == null) { + try { + channel = client.newChannel(httpAddress); + channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this); + awaitSettings(); + } catch (InterruptedException e) { + throw new ConnectException("unable to connect to " + httpAddress); + } } } - @Override - public HttpResponseListener getResponseListener() { - return responseListener; - } - - @Override - public void setHeadersListener(HttpHeadersListener httpHeadersListener) { - if (httpHeadersListener != null) { - this.httpHeadersListener = httpHeadersListener; - } - } - - @Override - public HttpHeadersListener getHeadersListener() { - return httpHeadersListener; - } - - @Override - public void setCookieListener(CookieListener cookieListener) { - if (cookieListener != null) { - this.cookieListener = cookieListener; - } - } - - @Override - public CookieListener getCookieListener() { - return cookieListener; - } - - @Override - public void setExceptionListener(ExceptionListener exceptionListener) { - if (exceptionListener != null) { - this.exceptionListener = exceptionListener; - } - } - - @Override - public ExceptionListener getExceptionListener() { - return exceptionListener; - } - - @Override - public void setPushListener(HttpPushListener pushListener) { - if (pushListener != null) { - this.pushListener = pushListener; - } - } - - @Override - public HttpPushListener getPushListener() { - return pushListener; - } - protected Request continuation(Integer streamId, FullHttpResponse httpResponse) throws URLSyntaxException { if (httpResponse == null) { return null; } + Request request = fromStreamId(streamId); + if (request == null) { + // push promise + return null; + } try { - if (streamId == null) { - streamId = requests.lastKey(); - } - Request request = requests.get(streamId); if (request.checkRedirect()) { int status = httpResponse.status().code(); switch (status) { @@ -238,20 +156,21 @@ abstract class BaseTransport implements Transport { URL redirUrl = URL.base(request.base()).resolve(location); HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod(); RequestBuilder newHttpRequestBuilder = Request.builder(method) - .setURL(redirUrl) + .url(redirUrl) .setVersion(request.httpVersion()) .setHeaders(request.headers()) - .setContent(request.content()); + .content(request.content()); + // TODO(jprante) convencience to copy pathAndQuery from one request to another request.base().getQueryParams().forEach(pair -> - newHttpRequestBuilder.addParam(pair.getFirst(), pair.getSecond()) + newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond()) ); request.cookies().forEach(newHttpRequestBuilder::addCookie); Request newHttpRequest = newHttpRequestBuilder.build(); newHttpRequest.setResponseListener(request.getResponseListener()); - newHttpRequest.setExceptionListener(request.getExceptionListener()); + //newHttpRequest.setExceptionListener(request.getExceptionListener()); newHttpRequest.setHeadersListener(request.getHeadersListener()); newHttpRequest.setCookieListener(request.getCookieListener()); - newHttpRequest.setPushListener(request.getPushListener()); + //newHttpRequest.setPushListener(request.getPushListener()); StringBuilder hostAndPort = new StringBuilder(); hostAndPort.append(redirUrl.getHost()); if (redirUrl.getPort() != null) { @@ -275,6 +194,13 @@ abstract class BaseTransport implements Transport { return null; } + protected Request fromStreamId(Integer streamId) { + if (streamId == null) { + streamId = requests.lastKey(); + } + return requests.get(streamId); + } + public void setCookieBox(Map cookieBox) { this.cookieBox = cookieBox; } diff --git a/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java b/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java index 4d3bcbe..4edb3a4 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java @@ -6,12 +6,17 @@ import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import org.xbib.net.URLSyntaxException; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; +import org.xbib.netty.http.client.listener.CookieListener; +import org.xbib.netty.http.client.listener.HttpHeadersListener; +import org.xbib.netty.http.client.listener.HttpResponseListener; +import java.io.IOException; import java.util.SortedMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; @@ -36,11 +41,6 @@ public class Http2Transport extends BaseTransport implements Transport { super(client, httpAddress); streamIdCounter = new AtomicInteger(3); streamidPromiseMap = new ConcurrentSkipListMap<>(); - } - - @Override - public void connect() throws InterruptedException { - super.connect(); settingsPromise = new CompletableFuture<>(); } @@ -86,42 +86,53 @@ public class Http2Transport extends BaseTransport implements Transport { } CompletableFuture promise = streamidPromiseMap.get(streamId); if (promise == null) { - logger.log(Level.WARNING, "message received for unknown stream id " + streamId); - if (pushListener != null) { - pushListener.onPushReceived(null, fullHttpResponse); - } + logger.log(Level.WARNING, "response received for unknown stream id " + streamId); } else { - if (responseListener != null) { - responseListener.onResponse(fullHttpResponse); - } - // forward? - try { - Request request = continuation(streamId, fullHttpResponse); - if (request != null) { - // synchronous call here - client.continuation(this, request); + Request request = fromStreamId(streamId); + if (request != null) { + HttpResponseListener responseListener = request.getResponseListener(); + if (responseListener != null) { + responseListener.onResponse(fullHttpResponse); + } + try { + request = continuation(streamId, fullHttpResponse); + if (request != null) { + // synchronous call here + client.continuation(this, request); + } + } catch (URLSyntaxException | IOException e) { + logger.log(Level.WARNING, e.getMessage(), e); } - } catch (URLSyntaxException e) { - logger.log(Level.WARNING, e.getMessage(), e); } - // complete origin transport + // complete origin promise.complete(true); } } @Override public void headersReceived(Integer streamId, HttpHeaders httpHeaders) { - if (httpHeadersListener != null) { - httpHeadersListener.onHeaders(httpHeaders); - } - if (cookieListener != null) { - for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { - Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); - cookieListener.onCookie(cookie); + Request request = fromStreamId(streamId); + if (request != null) { + HttpHeadersListener httpHeadersListener = request.getHeadersListener(); + if (httpHeadersListener != null) { + httpHeadersListener.onHeaders(httpHeaders); + } + CookieListener cookieListener = request.getCookieListener(); + if (cookieListener != null) { + for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { + Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); + cookieListener.onCookie(cookie); + } } } } + @Override + public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) { + streamidPromiseMap.put(promisedStreamId, new CompletableFuture<>()); + requests.put(promisedStreamId, fromStreamId(streamId)); + } + @Override public void awaitResponse(Integer streamId) { if (streamId == null) { @@ -156,9 +167,6 @@ public class Http2Transport extends BaseTransport implements Transport { @Override public void fail(Throwable throwable) { - if (exceptionListener != null) { - exceptionListener.onException(throwable); - } for (CompletableFuture promise : streamidPromiseMap.values()) { promise.completeExceptionally(throwable); } diff --git a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java index 1ef81ee..1356be4 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/HttpTransport.java @@ -6,12 +6,17 @@ import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import org.xbib.net.URLSyntaxException; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; +import org.xbib.netty.http.client.listener.CookieListener; +import org.xbib.netty.http.client.listener.HttpHeadersListener; +import org.xbib.netty.http.client.listener.HttpResponseListener; +import java.io.IOException; import java.util.SortedMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; @@ -58,15 +63,19 @@ public class HttpTransport extends BaseTransport implements Transport { @Override public void responseReceived(Integer streamId, FullHttpResponse fullHttpResponse) { - if (responseListener != null) { - responseListener.onResponse(fullHttpResponse); + Request request = fromStreamId(streamId); + if (request != null) { + HttpResponseListener responseListener = request.getResponseListener(); + if (responseListener != null) { + responseListener.onResponse(fullHttpResponse); + } } try { - Request request = continuation(null, fullHttpResponse); + request = continuation(null, fullHttpResponse); if (request != null) { client.continuation(this, request); } - } catch (URLSyntaxException e) { + } catch (URLSyntaxException | IOException e) { logger.log(Level.WARNING, e.getMessage(), e); } if (!sequentialPromiseMap.isEmpty()) { @@ -79,18 +88,27 @@ public class HttpTransport extends BaseTransport implements Transport { @Override public void headersReceived(Integer streamId, HttpHeaders httpHeaders) { - if (httpHeadersListener != null) { - httpHeadersListener.onHeaders(httpHeaders); - } - for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { - Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); - addCookie(cookie); - if (cookieListener != null) { - cookieListener.onCookie(cookie); + Request request = fromStreamId(streamId); + if (request != null) { + HttpHeadersListener httpHeadersListener = request.getHeadersListener(); + if (httpHeadersListener != null) { + httpHeadersListener.onHeaders(httpHeaders); + } + for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { + Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); + addCookie(cookie); + CookieListener cookieListener = request.getCookieListener(); + if (cookieListener != null) { + cookieListener.onCookie(cookie); + } } } } + @Override + public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) { + } + @Override public void awaitResponse(Integer streamId) { if (streamId == null) { @@ -125,11 +143,9 @@ public class HttpTransport extends BaseTransport implements Transport { @Override public void fail(Throwable throwable) { - if (exceptionListener != null) { - exceptionListener.onException(throwable); - } for (CompletableFuture promise : sequentialPromiseMap.values()) { promise.completeExceptionally(throwable); } } + } diff --git a/src/main/java/org/xbib/netty/http/client/transport/Transport.java b/src/main/java/org/xbib/netty/http/client/transport/Transport.java index 85a266d..86aec30 100644 --- a/src/main/java/org/xbib/netty/http/client/transport/Transport.java +++ b/src/main/java/org/xbib/netty/http/client/transport/Transport.java @@ -4,16 +4,13 @@ import io.netty.channel.Channel; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import io.netty.util.AttributeKey; import org.xbib.netty.http.client.HttpAddress; import org.xbib.netty.http.client.Request; -import org.xbib.netty.http.client.listener.CookieListener; -import org.xbib.netty.http.client.listener.ExceptionListener; -import org.xbib.netty.http.client.listener.HttpHeadersListener; -import org.xbib.netty.http.client.listener.HttpPushListener; -import org.xbib.netty.http.client.listener.HttpResponseListener; +import java.io.IOException; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -24,13 +21,9 @@ public interface Transport { HttpAddress httpAddress(); - void connect() throws InterruptedException; + Transport execute(Request request) throws IOException; - Transport execute(Request request); - - CompletableFuture execute(Request request, Function supplier); - - Channel channel(); + CompletableFuture execute(Request request, Function supplier) throws IOException; Integer nextStream(); @@ -38,26 +31,6 @@ public interface Transport { void awaitSettings(); - void setResponseListener(HttpResponseListener responseListener); - - HttpResponseListener getResponseListener(); - - void setExceptionListener(ExceptionListener exceptionListener); - - ExceptionListener getExceptionListener(); - - void setHeadersListener(HttpHeadersListener headersListener); - - HttpHeadersListener getHeadersListener(); - - void setPushListener(HttpPushListener pushListener); - - HttpPushListener getPushListener(); - - void setCookieListener(CookieListener cookieListener); - - CookieListener getCookieListener(); - void setCookieBox(Map cookieBox); Map getCookieBox(); @@ -66,6 +39,8 @@ public interface Transport { void headersReceived(Integer streamId, HttpHeaders httpHeaders); + void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers); + void awaitResponse(Integer streamId); Transport get(); diff --git a/src/test/java/org/xbib/netty/http/client/test/AkamaiTest.java b/src/test/java/org/xbib/netty/http/client/test/AkamaiTest.java deleted file mode 100644 index 8b1164c..0000000 --- a/src/test/java/org/xbib/netty/http/client/test/AkamaiTest.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.xbib.netty.http.client.test; - -import org.junit.Ignore; -import org.junit.Test; -import org.xbib.netty.http.client.Client; -import org.xbib.netty.http.client.Request; -import org.xbib.netty.http.client.test.LoggingBase; - -import java.nio.charset.StandardCharsets; -import java.util.logging.Level; -import java.util.logging.Logger; - -@Ignore -public class AkamaiTest extends LoggingBase { - - private static final Logger logger = Logger.getLogger(""); - - /** - * h2_demo_frame.html fails with: - * 2018-02-27 23:43:32.048 INFORMATION [client] io.netty.handler.codec.http2.Http2FrameLogger - * logRstStream [id: 0x4fe29f1e, L:/192.168.178.23:49429 - R:http2.akamai.com/104.94.191.203:443] - * INBOUND RST_STREAM: streamId=2 errorCode=8 - * 2018-02-27 23:43:32.049 SCHWERWIEGEND [] org.xbib.netty.http.client.test.a.AkamaiTest lambda$testAkamaiHttps$0 - * HTTP/2 to HTTP layer caught stream reset - * io.netty.handler.codec.http2.Http2Exception$StreamException: HTTP/2 to HTTP layer caught stream reset - * - * TODO(jprante) catch all promised pushes - */ - @Test - public void testAkamaiHttps() { - Client client = new Client(); - try { - Request request = Request.get() - //.setURL("https://http2.akamai.com/demo/h2_demo_frame.html") - .setURL("https://http2.akamai.com/") - .setVersion("HTTP/2.0") - .build() - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)) - .setResponseListener(fullHttpResponse -> { - String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); - logger.log(Level.INFO, "status = " + fullHttpResponse.status() - + " response body = " + response); - }) - .setPushListener((requestHeaders, fullHttpResponse) -> { - String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); - logger.log(Level.INFO, "received push: request headers = " + requestHeaders - + " status = " + fullHttpResponse.status() - + " response headers = " + fullHttpResponse.headers().entries() - + " response body = " + response - ); - }); - client.execute(request).get(); - } finally { - client.shutdownGracefully(); - } - } -} diff --git a/src/test/java/org/xbib/netty/http/client/test/ClientTest.java b/src/test/java/org/xbib/netty/http/client/test/ClientTest.java deleted file mode 100644 index 010e015..0000000 --- a/src/test/java/org/xbib/netty/http/client/test/ClientTest.java +++ /dev/null @@ -1,181 +0,0 @@ -package org.xbib.netty.http.client.test; - -import io.netty.handler.codec.http.HttpMethod; -import org.junit.Test; -import org.xbib.netty.http.client.Client; -import org.xbib.netty.http.client.HttpAddress; -import org.xbib.netty.http.client.Request; -import org.xbib.netty.http.client.transport.Transport; - -import java.nio.charset.StandardCharsets; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class ClientTest { - - private static final Logger logger = Logger.getLogger(ClientTest.class.getName()); - - @Test - public void testHttp1() throws Exception { - Client client = new Client(); - try { - Transport transport = client.newTransport(HttpAddress.http1("fl.hbz-nrw.de")); - transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.headers().entries() + - msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())); - transport.connect(); - transport.awaitSettings(); - simpleRequest(transport); - transport.get(); - transport.close(); - } finally { - client.shutdown(); - } - } - - @Test - public void testHttp1ParallelRequests() { - Client client = new Client(); - try { - Request request1 = Request.builder(HttpMethod.GET) - .setURL("http://fl.hbz-nrw.de").setVersion("HTTP/1.1") - .build() - .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.headers().entries() + - //msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())); - Request request2 = Request.builder(HttpMethod.GET) - .setURL("http://fl.hbz-nrw.de/app/fl/").setVersion("HTTP/1.1") - .build() - .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.headers().entries() + - //msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())); - - client.execute(request1); - client.execute(request2); - - } finally { - client.shutdownGracefully(); - } - } - - @Test - public void testHttp2() throws Exception { - String host = "webtide.com"; - Client client = new Client(); - client.logDiagnostics(Level.INFO); - try { - Transport transport = client.newTransport(HttpAddress.http2(host)); - transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.headers().entries() + - //msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())); - transport.setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " + - msg.headers().entries() + - //msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())); - transport.connect(); - transport.awaitSettings(); - simpleRequest(transport); - transport.get(); - transport.close(); - } finally { - client.shutdown(); - } - } - - @Test - public void testHttp2Request() { - //String url = "https://webtide.com"; - String url = "https://http2-push.io"; - // TODO register push announces into promises in order to wait for them all. - Client client = new Client(); - try { - Request request = Request.builder(HttpMethod.GET) - .setURL(url).setVersion("HTTP/2.0") - .build() - .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.headers().entries() + - msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())) - .setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " + - msg.headers().entries() + - msg.content().toString(StandardCharsets.UTF_8)) - ); - client.execute(request).get(); - - } finally { - client.shutdownGracefully(); - } - } - - @Test - public void testHttp2TwoRequestsOnSameConnection() { - Client client = new Client(); - try { - Request request1 = Request.builder(HttpMethod.GET) - .setURL("https://webtide.com").setVersion("HTTP/2.0") - .build() - .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.headers().entries() + - //msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())) - .setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " + - msg.headers().entries() - //msg.content().toString(StandardCharsets.UTF_8)) - )); - - Request request2 = Request.builder(HttpMethod.GET) - .setURL("https://webtide.com/why-choose-jetty/").setVersion("HTTP/2.0") - .build() - .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.headers().entries() + - //msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())) - .setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " + - msg.headers().entries() + - //msg.content().toString(StandardCharsets.UTF_8) + - " status=" + msg.status().code())); - - client.execute(request1).execute(request2); - - } finally { - client.shutdownGracefully(); - } - } - - @Test - public void testTwoTransports() throws Exception { - Client client = Client.builder().enableDebug().build(); - try { - Transport transport = client.newTransport(HttpAddress.http1("xbib.org")); - transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.content().toString(StandardCharsets.UTF_8))); - transport.connect(); - transport.awaitSettings(); - simpleRequest(transport); - transport.get(); - transport.close(); - - transport = client.newTransport(HttpAddress.http2("google.com")); - transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " + - msg.content().toString(StandardCharsets.UTF_8))); - transport.connect(); - transport.awaitSettings(); - simpleRequest(transport); - transport.get(); - transport.close(); - } finally { - client.shutdown(); - } - } - - private void simpleRequest(Transport transport) { - transport.execute(Request.builder(HttpMethod.GET) - .setVersion(transport.httpAddress().getVersion()) - .setURL(transport.httpAddress().base()).build()); - } - -} diff --git a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java index a9d42f9..8aa5261 100644 --- a/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/CompletableFutureTest.java @@ -5,6 +5,7 @@ import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -19,23 +20,28 @@ public class CompletableFutureTest { * Get some weird content from one URL and post it to another URL, by composing completable futures. */ @Test - public void testComposeCompletableFutures() { + public void testComposeCompletableFutures() throws IOException { Client client = new Client(); try { final Function httpResponseStringFunction = response -> response.content().toString(StandardCharsets.UTF_8); Request request = Request.get() - .setURL("http://alkmene.hbz-nrw.de/repository/org/xbib/content/2.0.0-SNAPSHOT/maven-metadata-local.xml") + .url("http://alkmene.hbz-nrw.de/repository/org/xbib/content/2.0.0-SNAPSHOT/maven-metadata-local.xml") .build(); CompletableFuture completableFuture = client.execute(request, httpResponseStringFunction) .exceptionally(Throwable::getMessage) .thenCompose(content -> { logger.log(Level.INFO, content); // POST is not allowed, we don't care - return client.execute(Request.post() - .setURL("http://google.com/") - .addParam("query", content) - .build(), httpResponseStringFunction); + try { + return client.execute(Request.post() + .url("http://google.com/") + .addParameter("query", content) + .build(), httpResponseStringFunction); + } catch (IOException e) { + logger.log(Level.WARNING, e.getMessage(), e); + return null; + } }); String result = completableFuture.join(); logger.log(Level.INFO, "completablefuture result = " + result); diff --git a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java index 1c15f5e..3e974dc 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ConscryptTest.java @@ -5,6 +5,7 @@ import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.logging.Level; import java.util.logging.Logger; @@ -14,7 +15,7 @@ public class ConscryptTest extends LoggingBase { private static final Logger logger = Logger.getLogger(""); @Test - public void testConscrypt() { + public void testConscrypt() throws IOException { Client client = Client.builder() .enableDebug() .setJdkSslProvider() @@ -23,10 +24,9 @@ public class ConscryptTest extends LoggingBase { logger.log(Level.INFO, client.getClientConfig().toString()); try { Request request = Request.get() - .setURL("https://fl-test.hbz-nrw.de") + .url("https://fl-test.hbz-nrw.de") .setVersion("HTTP/2.0") .build() - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)) .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() diff --git a/src/test/java/org/xbib/netty/http/client/test/HttpBinTest.java b/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java similarity index 74% rename from src/test/java/org/xbib/netty/http/client/test/HttpBinTest.java rename to src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java index 89a90d2..d0970b2 100644 --- a/src/test/java/org/xbib/netty/http/client/test/HttpBinTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/CookieSetterHttpBinTest.java @@ -4,15 +4,16 @@ import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.logging.Level; import java.util.logging.Logger; /** */ -public class HttpBinTest extends LoggingBase { +public class CookieSetterHttpBinTest extends LoggingBase { - private static final Logger logger = Logger.getLogger(HttpBinTest.class.getName()); + private static final Logger logger = Logger.getLogger(CookieSetterHttpBinTest.class.getName()); /** * Test httpbin.org "Set-Cookie:" header after redirection of URL. @@ -28,15 +29,14 @@ public class HttpBinTest extends LoggingBase { * @throws Exception */ @Test - public void testHttpBinCookies() { + public void testHttpBinCookies() throws IOException { Client client = new Client(); try { Request request = Request.get() - .setURL("http://httpbin.org/cookies/set?name=value") + .url("http://httpbin.org/cookies/set?name=value") .build() - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)) - .setCookieListener(cookie -> logger.log(Level.INFO, "this is the cookie " + cookie.toString())) - .setHeadersListener(headers -> logger.log(Level.INFO, headers.toString())) + .setCookieListener(cookie -> logger.log(Level.INFO, "this is the cookie: " + cookie.toString())) + .setHeadersListener(headers -> logger.log(Level.INFO, "headers = " + headers.entries().toString())) .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); diff --git a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java index 965843a..7e53223 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java @@ -6,6 +6,7 @@ import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; import org.xbib.netty.http.client.transport.Transport; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -21,17 +22,16 @@ public class ElasticsearchTest extends LoggingBase { private static final Logger logger = Logger.getLogger(ElasticsearchTest.class.getName()); @Test - public void testElasticsearchCreateDocument() { + public void testElasticsearchCreateDocument() throws IOException { Client client = new Client(); try { - Request request = Request.put().setURL("http://localhost:9200/test/test/1") + Request request = Request.put().url("http://localhost:9200/test/test/1") .json("{\"text\":\"Hello World\"}") .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); - }) - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)); + }); client.execute(request); } finally { client.shutdownGracefully(); @@ -39,17 +39,16 @@ public class ElasticsearchTest extends LoggingBase { } @Test - public void testElasticsearchMatchQuery() { + public void testElasticsearchMatchQuery() throws IOException { Client client = new Client(); try { - Request request = Request.post().setURL("http://localhost:9200/test/_search") + Request request = Request.post().url("http://localhost:9200/test/_search") .json("{\"query\":{\"match\":{\"text\":\"Hello World\"}}}") .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); - }) - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)); + }); client.execute(request).get(); } finally { client.shutdownGracefully(); @@ -57,7 +56,7 @@ public class ElasticsearchTest extends LoggingBase { } @Test - public void testElasticsearchConcurrent() { + public void testElasticsearchConcurrent() throws IOException { Client client = Client.builder().setReadTimeoutMillis(20000).build(); int max = 1000; try { @@ -78,15 +77,14 @@ public class ElasticsearchTest extends LoggingBase { private Request newRequest() { return Request.post() - .setURL("http://localhost:9200/test/_search") + .url("http://localhost:9200/test/_search") .json("{\"query\":{\"match\":{\"text\":\"Hello World\"}}}") .addHeader("connection", "keep-alive") .build() .setResponseListener(fullHttpResponse -> logger.log(Level.FINE, "status = " + fullHttpResponse.status() + " counter = " + count.incrementAndGet() + - " response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8))) - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)); + " response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8))); } private final AtomicInteger count = new AtomicInteger(); diff --git a/src/test/java/org/xbib/netty/http/client/test/Http1Test.java b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java new file mode 100644 index 0000000..f01e312 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/Http1Test.java @@ -0,0 +1,76 @@ +package org.xbib.netty.http.client.test; + +import io.netty.handler.codec.http.HttpMethod; +import org.junit.Test; +import org.xbib.netty.http.client.Client; +import org.xbib.netty.http.client.Request; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Http1Test { + + private static final Logger logger = Logger.getLogger(Http1Test.class.getName()); + + @Test + public void testHttp1() throws Exception { + Client client = new Client(); + try { + Request request = Request.get().url("http://fl.hbz-nrw.de").build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + client.execute(request).get(); + } finally { + client.shutdown(); + } + } + + @Test + public void testHttp1ParallelRequests() throws IOException { + Client client = new Client(); + try { + Request request1 = Request.builder(HttpMethod.GET) + .url("http://fl.hbz-nrw.de").setVersion("HTTP/1.1") + .build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + Request request2 = Request.builder(HttpMethod.GET) + .url("http://fl.hbz-nrw.de/app/fl/").setVersion("HTTP/1.1") + .build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + + client.execute(request1); + client.execute(request2); + + } finally { + client.shutdownGracefully(); + } + } + + @Test + public void testTwoTransports() throws Exception { + Client client = Client.builder().enableDebug().build(); + try { + Request request1 = Request.get().url("http://xbib.org").build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.content().toString(StandardCharsets.UTF_8))); + client.execute(request1).get(); + + Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0").build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.content().toString(StandardCharsets.UTF_8))); + client.execute(request2).get(); + } finally { + client.shutdown(); + } + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/Http2Test.java b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java new file mode 100644 index 0000000..1c2d2ac --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/Http2Test.java @@ -0,0 +1,104 @@ +package org.xbib.netty.http.client.test; + +import io.netty.handler.codec.http.HttpMethod; +import org.junit.Ignore; +import org.junit.Test; +import org.xbib.netty.http.client.Client; +import org.xbib.netty.http.client.Request; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Http2Test { + + private static final Logger logger = Logger.getLogger(Http2Test.class.getName()); + + /** + */ + @Test + public void testAkamai() throws IOException { + Client client = Client.builder().enableDebug().build(); + try { + Request request = Request.get() + .url("https://http2.akamai.com/demo/h2_demo_frame.html") + //.url("https://http2.akamai.com/") + .setVersion("HTTP/2.0") + .build() + .setResponseListener(fullHttpResponse -> { + String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); + logger.log(Level.INFO, "status = " + fullHttpResponse.status() + + " response body = " + response); + }); + client.execute(request).get(); + } finally { + client.shutdownGracefully(); + } + } + + @Test + public void testWebtide() throws Exception { + Client client = Client.builder().enableDebug().build(); + client.logDiagnostics(Level.INFO); + try { + Request request = Request.get().url("https://webtide.com").setVersion("HTTP/2.0").build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + client.execute(request).get(); + } finally { + client.shutdown(); + } + } + + @Test + public void testHttp2PushIO() throws IOException { + //String url = "https://webtide.com"; + String url = "https://http2-push.io"; + // TODO register push announces into promises in order to wait for them all. + Client client = Client.builder().enableDebug().build(); + try { + Request request = Request.builder(HttpMethod.GET) + .url(url).setVersion("HTTP/2.0") + .build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + client.execute(request).get(); + + } finally { + client.shutdownGracefully(); + } + } + + @Test + public void testWebtideTwoRequestsOnSameConnection() { + Client client = new Client(); + try { + Request request1 = Request.builder(HttpMethod.GET) + .url("https://webtide.com").setVersion("HTTP/2.0") + .build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + + Request request2 = Request.builder(HttpMethod.GET) + .url("https://webtide.com/why-choose-jetty/").setVersion("HTTP/2.0") + .build() + .setResponseListener(msg -> logger.log(Level.INFO, "got response: " + + msg.headers().entries() + + //msg.content().toString(StandardCharsets.UTF_8) + + " status=" + msg.status().code())); + + client.execute(request1).execute(request2); + } catch (IOException e) { + // + } finally { + client.shutdownGracefully(); + } + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/RequestBuilderTest.java b/src/test/java/org/xbib/netty/http/client/test/RequestBuilderTest.java new file mode 100644 index 0000000..c649630 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/RequestBuilderTest.java @@ -0,0 +1,19 @@ +package org.xbib.netty.http.client.test; + +import io.netty.handler.codec.http.HttpMethod; +import org.junit.Test; +import org.xbib.netty.http.client.Request; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class RequestBuilderTest { + + private static final Logger logger = Logger.getLogger(RequestBuilderTest.class.getName()); + + @Test + public void testSimpleRequest() { + Request request = Request.builder(HttpMethod.GET).build(); + logger.log(Level.INFO, request.toString()); + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/URITest.java b/src/test/java/org/xbib/netty/http/client/test/URITest.java index b68a66b..a5a8cdb 100644 --- a/src/test/java/org/xbib/netty/http/client/test/URITest.java +++ b/src/test/java/org/xbib/netty/http/client/test/URITest.java @@ -25,13 +25,13 @@ public class URITest { @Test public void testRequestURIs() { RequestBuilder httpRequestBuilder = Request.get(); - httpRequestBuilder.setURL("https://localhost").path("/path"); + httpRequestBuilder.url("https://localhost").uri("/path"); assertEquals("/path", httpRequestBuilder.build().relativeUri()); - httpRequestBuilder.path("/foobar"); + httpRequestBuilder.uri("/foobar"); assertEquals("/foobar", httpRequestBuilder.build().relativeUri()); - httpRequestBuilder.path("/path1?a=b"); + httpRequestBuilder.uri("/path1?a=b"); assertEquals("/path1?a=b", httpRequestBuilder.build().relativeUri()); - httpRequestBuilder.path("/path2?c=d"); + httpRequestBuilder.uri("/path2?c=d"); assertEquals("/path2?c=d", httpRequestBuilder.build().relativeUri()); } } diff --git a/src/test/java/org/xbib/netty/http/client/test/XbibTest.java b/src/test/java/org/xbib/netty/http/client/test/XbibTest.java index ba75b2d..4584329 100644 --- a/src/test/java/org/xbib/netty/http/client/test/XbibTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/XbibTest.java @@ -5,8 +5,8 @@ import io.netty.handler.proxy.HttpProxyHandler; import org.junit.Test; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Request; -import org.xbib.netty.http.client.test.LoggingBase; +import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; @@ -19,10 +19,10 @@ public class XbibTest extends LoggingBase { private static final Logger logger = Logger.getLogger(""); @Test - public void testXbibOrgWithDefaults() { + public void testXbibOrgWithDefaults() throws IOException { Client client = new Client(); try { - Request request = Request.get().setURL("http://xbib.org") + Request request = Request.get().url("http://xbib.org") .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); @@ -35,21 +35,28 @@ public class XbibTest extends LoggingBase { } @Test - public void testXbibOrgWithCompletableFuture() { + public void testXbibOrgWithCompletableFuture() throws IOException { Client httpClient = Client.builder() .setTcpNodelay(true) .build(); try { final Function httpResponseStringFunction = response -> response.content().toString(StandardCharsets.UTF_8); - Request request = Request.get().setURL("http://xbib.org") + Request request = Request.get().url("http://xbib.org") .build(); final CompletableFuture completableFuture = httpClient.execute(request, httpResponseStringFunction) .exceptionally(Throwable::getMessage) - .thenCompose(content -> httpClient.execute(Request.post() - .setURL("http://google.de") - .addParam("query", content) - .build(), httpResponseStringFunction)); + .thenCompose(content -> { + try { + return httpClient.execute(Request.post() + .url("http://google.de") + .addParameter("query", content) + .build(), httpResponseStringFunction); + } catch (IOException e) { + logger.log(Level.WARNING, e.getMessage(), e); + return null; + } + }); String result = completableFuture.join(); logger.info("result = " + result); } finally { @@ -58,7 +65,7 @@ public class XbibTest extends LoggingBase { } @Test - public void testXbibOrgWithProxy() { + public void testXbibOrgWithProxy() throws IOException { Client httpClient = Client.builder() .setHttpProxyHandler(new HttpProxyHandler(new InetSocketAddress("80.241.223.251", 8080))) .setConnectTimeoutMillis(30000) @@ -66,13 +73,12 @@ public class XbibTest extends LoggingBase { .build(); try { httpClient.execute(Request.get() - .setURL("http://xbib.org") + .url("http://xbib.org") .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); - }) - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))) + })) .get(); } finally { httpClient.shutdownGracefully(); @@ -80,19 +86,18 @@ public class XbibTest extends LoggingBase { } @Test - public void testXbibOrgWithVeryShortReadTimeout() { + public void testXbibOrgWithVeryShortReadTimeout() throws IOException { Client httpClient = Client.builder() .setReadTimeoutMillis(50) .build(); try { httpClient.execute(Request.get() - .setURL("http://xbib.org") + .url("http://xbib.org") .build() .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); - }) - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))) + })) .get(); } finally { httpClient.shutdownGracefully(); @@ -100,14 +105,13 @@ public class XbibTest extends LoggingBase { } @Test - public void testXbibTwoSequentialRequests() { + public void testXbibTwoSequentialRequests() throws IOException { Client httpClient = new Client(); try { httpClient.execute(Request.get() .setVersion("HTTP/1.1") - .setURL("http://xbib.org") + .url("http://xbib.org") .build() - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)) .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); @@ -116,9 +120,8 @@ public class XbibTest extends LoggingBase { httpClient.execute(Request.get() .setVersion("HTTP/1.1") - .setURL("http://xbib.org") + .url("http://xbib.org") .build() - .setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)) .setResponseListener(fullHttpResponse -> { String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java new file mode 100644 index 0000000..8c04cf4 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/pool/EpollTest.java @@ -0,0 +1,132 @@ +package org.xbib.netty.http.client.test.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.socket.SocketChannel; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.xbib.netty.http.client.HttpAddress; +import org.xbib.netty.http.client.pool.Pool; +import org.xbib.netty.http.client.pool.SimpleChannelPool; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class EpollTest { + + private static final Logger logger = Logger.getLogger(EpollTest.class.getName()); + + private static final int CONCURRENCY = 10; + + private static final List NODES = + Collections.singletonList(HttpAddress.http1("localhost", 12345)); + + private static final long TEST_TIME_SECONDS = 100; + + private static final int ATTEMPTS = 10_000; + + private static final int FAIL_EVERY_ATTEMPT = 10; + + private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000); + + private MockEpollServer mockEpollServer; + + private Pool channelPool; + + private EventLoopGroup eventLoopGroup; + + @Before + public void setUp() throws Exception { + mockEpollServer = new MockEpollServer(12345, FAIL_EVERY_ATTEMPT); + Semaphore semaphore = new Semaphore(CONCURRENCY); + eventLoopGroup = new EpollEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(EpollSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) { + socketChannel.pipeline().addLast(new DummyClientChannelHandler()); + } + }) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.TCP_NODELAY, true); + channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0); + channelPool.prepare(CONCURRENCY); + } + + @After + public void tearDown() throws Exception { + channelPool.close(); + eventLoopGroup.shutdownGracefully(); + mockEpollServer.close(); + } + + @Ignore + @Test + public void testPoolEpoll() throws Exception { + LongAdder longAdder = new LongAdder(); + ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY); + for(int i = 0; i < CONCURRENCY; i ++) { + executor.submit(() -> { + Channel channel; + for(int j = 0; j < ATTEMPTS; j ++) { + try { + while ((channel = channelPool.acquire()) == null) { + Thread.sleep(1); // very short? + } + channel.writeAndFlush(PAYLOAD.retain()).sync(); + channelPool.release(channel); + longAdder.increment(); + } catch (InterruptedException e) { + break; + } catch (Throwable cause) { + logger.log(Level.WARNING, cause.getMessage(), cause); + } + } + }); + } + executor.shutdown(); + executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS); + assertTrue(executor.isTerminated()); + assertEquals(CONCURRENCY * ATTEMPTS, longAdder.sum(), + 2 * CONCURRENCY * ATTEMPTS / FAIL_EVERY_ATTEMPT); + } + + class DummyClientChannelHandler extends SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + logger.log(Level.WARNING, cause.getMessage(), cause); + } + } + +} diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/MockEpollServer.java b/src/test/java/org/xbib/netty/http/client/test/pool/MockEpollServer.java new file mode 100644 index 0000000..04c3891 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/pool/MockEpollServer.java @@ -0,0 +1,64 @@ +package org.xbib.netty.http.client.test.pool; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.socket.SocketChannel; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class MockEpollServer implements Closeable { + + private static final Logger logger = Logger.getLogger(MockEpollServer.class.getName()); + + private final EventLoopGroup dispatchGroup; + + private final EventLoopGroup workerGroup; + + private final ChannelFuture bindFuture; + + private final AtomicLong reqCounter; + + public MockEpollServer(int port, int dropEveryRequest) throws InterruptedException { + dispatchGroup = new EpollEventLoopGroup(); + workerGroup = new EpollEventLoopGroup(); + reqCounter = new AtomicLong(0); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(dispatchGroup, workerGroup) + .channel(EpollServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + if (dropEveryRequest > 0) { + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + if (reqCounter.incrementAndGet() % dropEveryRequest == 0) { + Channel channel = ctx.channel(); + logger.log(Level.INFO,"dropping the connection " + channel); + channel.close(); + } + } + }); + } + } + }); + bindFuture = bootstrap.bind(port).sync(); + } + + @Override + public void close() { + bindFuture.channel().close(); + workerGroup.shutdownGracefully(); + dispatchGroup.shutdownGracefully(); + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/MockNioServer.java b/src/test/java/org/xbib/netty/http/client/test/pool/MockNioServer.java new file mode 100644 index 0000000..3bc5a82 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/pool/MockNioServer.java @@ -0,0 +1,62 @@ +package org.xbib.netty.http.client.test.pool; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class MockNioServer implements Closeable { + + private static final Logger logger = Logger.getLogger(MockNioServer.class.getName()); + + private final EventLoopGroup dispatchGroup; + + private final EventLoopGroup workerGroup; + + private final ChannelFuture bindFuture; + + private final AtomicLong reqCounter; + + public MockNioServer(int port, int dropEveryRequest) throws InterruptedException { + dispatchGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + reqCounter = new AtomicLong(0); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(dispatchGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + if (reqCounter.incrementAndGet() % dropEveryRequest == 0) { + Channel channel = ctx.channel(); + logger.log(Level.INFO, "dropping the connection " + channel); + channel.close(); + } + } + }); + } + }); + bindFuture = bootstrap.bind(port).sync(); + } + + @Override + public void close() { + bindFuture.channel().close(); + workerGroup.shutdownGracefully(); + dispatchGroup.shutdownGracefully(); + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java new file mode 100644 index 0000000..e65e767 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/pool/NioTest.java @@ -0,0 +1,128 @@ +package org.xbib.netty.http.client.test.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.xbib.netty.http.client.HttpAddress; +import org.xbib.netty.http.client.pool.Pool; +import org.xbib.netty.http.client.pool.SimpleChannelPool; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class NioTest { + + private static final Logger logger = Logger.getLogger(NioTest.class.getName()); + + private static final int CONCURRENCY = 10; + + private static final List NODES = + Collections.singletonList(HttpAddress.http1("localhost", 12345)); + + private static final long TEST_TIME_SECONDS = 100; + + private static final int ATTEMPTS = 10_000; + + private static final int FAIL_EVERY_ATTEMPT = 10; + + private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000); + + private MockNioServer mockNioServer; + + private Pool channelPool; + + private EventLoopGroup eventLoopGroup; + + @Before + public void setUp() throws Exception { + mockNioServer = new MockNioServer(12345, FAIL_EVERY_ATTEMPT); + Semaphore semaphore = new Semaphore(CONCURRENCY); + eventLoopGroup = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) { + socketChannel.pipeline().addLast(new DummyClientChannelHandler()); + } + }) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.TCP_NODELAY, true); + channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0); + channelPool.prepare(CONCURRENCY); + } + + @After + public void tearDown() throws Exception { + channelPool.close(); + eventLoopGroup.shutdownGracefully(); + mockNioServer.close(); + } + + @Test + public void testPoolNio() throws Exception { + LongAdder longAdder = new LongAdder(); + ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY); + for(int i = 0; i < CONCURRENCY; i ++) { + executor.submit(() -> { + Channel channel; + for(int j = 0; j < ATTEMPTS; j ++) { + try { + while ((channel = channelPool.acquire()) == null) { + Thread.sleep(1); + } + channel.writeAndFlush(PAYLOAD.retain()).sync(); + channelPool.release(channel); + longAdder.increment(); + } catch (InterruptedException e) { + break; + } catch (Throwable cause) { + logger.log(Level.WARNING, cause.getMessage(), cause); + } + } + }); + } + executor.shutdown(); + executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS); + assertTrue(executor.isTerminated()); + assertEquals(CONCURRENCY * ATTEMPTS, longAdder.sum(), + 2 * CONCURRENCY * ATTEMPTS / FAIL_EVERY_ATTEMPT); + } + + class DummyClientChannelHandler extends SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + logger.log(Level.WARNING, cause.getMessage(), cause); + } + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java b/src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java new file mode 100644 index 0000000..9351230 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/pool/SimplePoolTest.java @@ -0,0 +1,124 @@ +package org.xbib.netty.http.client.test.pool; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.netty.util.AttributeKey; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.xbib.netty.http.client.HttpAddress; +import org.xbib.netty.http.client.pool.Pool; +import org.xbib.netty.http.client.pool.SimpleChannelPool; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class SimplePoolTest { + + private static final Logger logger = Logger.getLogger(SimplePoolTest.class.getName()); + + private static final int TEST_STEP_TIME_SECONDS = 50; + + private static final int BATCH_SIZE = 0x1000; + + private int nodeCount; + + private ConcurrentMap nodeFreq = new ConcurrentHashMap<>(); + + @Parameterized.Parameters + public static Collection generateData() { + return Arrays.asList(new Object[][] { + {1, 1}, + {10, 1}, {10, 2}, {10, 5}, {10, 10}, + {100, 1}, {100, 2}, {100, 5}, {100, 10}, + {1000, 1}, {1000, 2}, {1000, 5}, {1000, 10} + }); + } + + public SimplePoolTest(int concurrencyLevel, int nodeCount) { + this.nodeCount = nodeCount; + List nodes = new ArrayList<>(); + for (int i = 0; i < nodeCount; i ++) { + nodes.add(HttpAddress.http1("localhost" + i)); + } + try (Pool pool = new SimpleChannelPool<>(new Semaphore(concurrencyLevel), nodes, new Bootstrap(), + null, 0)) { + int n = Runtime.getRuntime().availableProcessors(); + ExecutorService executorService = Executors.newFixedThreadPool(n); + for(int i = 0; i < n; i ++) { + executorService.submit(() -> { + Thread currThread = Thread.currentThread(); + List channels = new ArrayList<>(BATCH_SIZE); + int j; + int k; + Channel channel; + try { + while (!currThread.isInterrupted()) { + for (j = 0; j < BATCH_SIZE; j ++) { + channel = pool.acquire(); + if (channel == null) { + break; + } + AttributeKey attributeKey = AttributeKey.valueOf("poolKey"); + nodeFreq.computeIfAbsent(channel.attr(attributeKey).get(), node -> new LongAdder()).increment(); + channels.add(channel); + } + for (k = 0; k < j; k ++) { + pool.release(channels.get(k)); + } + channels.clear(); + } + } catch (Exception ignored) { + // + } + }); + } + executorService.shutdown(); + try { + executorService.awaitTermination(TEST_STEP_TIME_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + executorService.shutdownNow(); + } catch (Throwable t) { + logger.log(Level.WARNING, t.getMessage(), t); + } finally { + long connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum(); + logger.log(Level.INFO, "concurrency = " + concurrencyLevel + ", nodes = " + nodeCount + " -> rate: " + + connCountSum / TEST_STEP_TIME_SECONDS); + } + } + + @Test + public void testNodeFrequency() { + if (nodeCount > 1) { + long connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum(); + long avgConnCountPerNode = connCountSum / nodeCount; + for (HttpAddress nodeAddr: nodeFreq.keySet()) { + assertTrue(nodeFreq.get(nodeAddr).sum() > 0); + assertEquals("Node count: " + nodeCount + ", node: " + nodeAddr + + ", expected connection count: " + avgConnCountPerNode + ", actual: " + + nodeFreq.get(nodeAddr).sum(), + avgConnCountPerNode, nodeFreq.get(nodeAddr).sum(), 1.5 * avgConnCountPerNode); + } + } else { + assertTrue(true); + } + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java b/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java new file mode 100644 index 0000000..3b8e4c4 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/retry/ExponentialBackOffTest.java @@ -0,0 +1,156 @@ +package org.xbib.netty.http.client.test.retry; + +import org.junit.Test; +import org.xbib.netty.http.client.retry.BackOff; +import org.xbib.netty.http.client.retry.ExponentialBackOff; +import org.xbib.netty.http.client.retry.NanoClock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests {@link ExponentialBackOff}. + */ +public class ExponentialBackOffTest { + + @Test + public void testConstructor() { + ExponentialBackOff backOffPolicy = new ExponentialBackOff(); + assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS, + backOffPolicy.getInitialIntervalMillis()); + assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS, + backOffPolicy.getCurrentIntervalMillis()); + assertEquals(ExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR, + backOffPolicy.getRandomizationFactor(), 1); + assertEquals(ExponentialBackOff.DEFAULT_MULTIPLIER, backOffPolicy.getMultiplier(), 1); + assertEquals( + ExponentialBackOff.DEFAULT_MAX_INTERVAL_MILLIS, backOffPolicy.getMaxIntervalMillis()); + assertEquals(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS, + backOffPolicy.getMaxElapsedTimeMillis()); + } + + @Test + public void testBuilder() { + ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder().build(); + assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS, + backOffPolicy.getInitialIntervalMillis()); + assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS, + backOffPolicy.getCurrentIntervalMillis()); + assertEquals(ExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR, + backOffPolicy.getRandomizationFactor(), 1); + assertEquals(ExponentialBackOff.DEFAULT_MULTIPLIER, backOffPolicy.getMultiplier(), 1); + assertEquals(ExponentialBackOff.DEFAULT_MAX_INTERVAL_MILLIS, backOffPolicy.getMaxIntervalMillis()); + assertEquals(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS, + backOffPolicy.getMaxElapsedTimeMillis()); + + int testInitialInterval = 1; + double testRandomizationFactor = 0.1; + double testMultiplier = 5.0; + int testMaxInterval = 10; + int testMaxElapsedTime = 900000; + + backOffPolicy = new ExponentialBackOff.Builder() + .setInitialIntervalMillis(testInitialInterval) + .setRandomizationFactor(testRandomizationFactor) + .setMultiplier(testMultiplier) + .setMaxIntervalMillis(testMaxInterval) + .setMaxElapsedTimeMillis(testMaxElapsedTime) + .build(); + assertEquals(testInitialInterval, backOffPolicy.getInitialIntervalMillis()); + assertEquals(testInitialInterval, backOffPolicy.getCurrentIntervalMillis()); + assertEquals(testRandomizationFactor, backOffPolicy.getRandomizationFactor(), 1); + assertEquals(testMultiplier, backOffPolicy.getMultiplier(), 1); + assertEquals(testMaxInterval, backOffPolicy.getMaxIntervalMillis()); + assertEquals(testMaxElapsedTime, backOffPolicy.getMaxElapsedTimeMillis()); + } + + @Test + public void testBackOff() { + int testInitialInterval = 500; + double testRandomizationFactor = 0.1; + double testMultiplier = 2.0; + int testMaxInterval = 5000; + int testMaxElapsedTime = 900000; + + ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder() + .setInitialIntervalMillis(testInitialInterval) + .setRandomizationFactor(testRandomizationFactor) + .setMultiplier(testMultiplier) + .setMaxIntervalMillis(testMaxInterval) + .setMaxElapsedTimeMillis(testMaxElapsedTime) + .build(); + int[] expectedResults = {500, 1000, 2000, 4000, 5000, 5000, 5000, 5000, 5000, 5000}; + for (int expected : expectedResults) { + assertEquals(expected, backOffPolicy.getCurrentIntervalMillis()); + // Assert that the next back off falls in the expected range. + int minInterval = (int) (expected - (testRandomizationFactor * expected)); + int maxInterval = (int) (expected + (testRandomizationFactor * expected)); + long actualInterval = backOffPolicy.nextBackOffMillis(); + assertTrue(minInterval <= actualInterval && actualInterval <= maxInterval); + } + } + + @Test + public void testGetRandomizedInterval() { + // 33% chance of being 1. + assertEquals(1, ExponentialBackOff.getRandomValueFromInterval(0.5, 0, 2)); + assertEquals(1, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.33, 2)); + // 33% chance of being 2. + assertEquals(2, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.34, 2)); + assertEquals(2, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.66, 2)); + // 33% chance of being 3. + assertEquals(3, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.67, 2)); + assertEquals(3, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.99, 2)); + } + + @Test + public void testGetElapsedTimeMillis() { + ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder().setNanoClock(new MyNanoClock()).build(); + long elapsedTimeMillis = backOffPolicy.getElapsedTimeMillis(); + assertEquals("elapsedTimeMillis=" + elapsedTimeMillis, 1000, elapsedTimeMillis); + } + + @Test + public void testMaxElapsedTime() { + ExponentialBackOff backOffPolicy = + new ExponentialBackOff.Builder().setNanoClock(new MyNanoClock(10000)).build(); + assertTrue(backOffPolicy.nextBackOffMillis() != BackOff.STOP); + // Change the currentElapsedTimeMillis to be 0 ensuring that the elapsed time will be greater + // than the max elapsed time. + backOffPolicy.setStartTimeNanos(0); + assertEquals(BackOff.STOP, backOffPolicy.nextBackOffMillis()); + } + + @Test + public void testBackOffOverflow() { + int testInitialInterval = Integer.MAX_VALUE / 2; + double testMultiplier = 2.1; + int testMaxInterval = Integer.MAX_VALUE; + ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder() + .setInitialIntervalMillis(testInitialInterval) + .setMultiplier(testMultiplier) + .setMaxIntervalMillis(testMaxInterval) + .build(); + backOffPolicy.nextBackOffMillis(); + // Assert that when an overflow is possible the current interval is set to the max interval. + assertEquals(testMaxInterval, backOffPolicy.getCurrentIntervalMillis()); + } + + static class MyNanoClock implements NanoClock { + + private int i = 0; + private long startSeconds; + + MyNanoClock() { + } + + MyNanoClock(long startSeconds) { + this.startSeconds = startSeconds; + } + + public long nanoTime() { + return (startSeconds + i++) * 1000000000; + } + } + +} diff --git a/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java new file mode 100644 index 0000000..9fa01f6 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOff.java @@ -0,0 +1,75 @@ +package org.xbib.netty.http.client.test.retry; + +import org.xbib.netty.http.client.retry.BackOff; + +import java.io.IOException; + +/** + * Mock for {@link BackOff} that always returns a fixed number. + * + *

+ * Implementation is not thread-safe. + *

+ * + */ +public class MockBackOff implements BackOff { + + /** Fixed back-off milliseconds. */ + private long backOffMillis; + + /** Maximum number of tries before returning {@link #STOP}. */ + private int maxTries = 10; + + /** Number of tries so far. */ + private int numTries; + + public void reset() throws IOException { + numTries = 0; + } + + public long nextBackOffMillis() throws IOException { + if (numTries >= maxTries || backOffMillis == STOP) { + return STOP; + } + numTries++; + return backOffMillis; + } + + /** + * Sets the fixed back-off milliseconds (defaults to {@code 0}). + * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public MockBackOff setBackOffMillis(long backOffMillis) { + //Preconditions.checkArgument(backOffMillis == STOP || backOffMillis >= 0); + this.backOffMillis = backOffMillis; + return this; + } + + /** + * Sets the maximum number of tries before returning {@link #STOP} (defaults to {@code 10}). + * + *

+ * Overriding is only supported for the purpose of calling the super implementation and changing + * the return type, but nothing else. + *

+ */ + public MockBackOff setMaxTries(int maxTries) { + //Preconditions.checkArgument(maxTries >= 0); + this.maxTries = maxTries; + return this; + } + + /** Returns the maximum number of tries before returning {@link #STOP}. */ + public final int getMaxTries() { + return numTries; + } + + /** Returns the number of tries so far. */ + public final int getNumberOfTries() { + return numTries; + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOffTest.java b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOffTest.java new file mode 100644 index 0000000..1e79dc8 --- /dev/null +++ b/src/test/java/org/xbib/netty/http/client/test/retry/MockBackOffTest.java @@ -0,0 +1,27 @@ +package org.xbib.netty.http.client.test.retry; + +import org.junit.Test; +import org.xbib.netty.http.client.retry.BackOff; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link MockBackOff}. + */ +public class MockBackOffTest { + + @Test + public void testNextBackOffMillis() throws IOException { + subtestNextBackOffMillis(0, new MockBackOff()); + subtestNextBackOffMillis(BackOff.STOP, new MockBackOff().setBackOffMillis(BackOff.STOP)); + subtestNextBackOffMillis(42, new MockBackOff().setBackOffMillis(42)); + } + + private void subtestNextBackOffMillis(long expectedValue, BackOff backOffPolicy) throws IOException { + for (int i = 0; i < 10; i++) { + assertEquals(expectedValue, backOffPolicy.nextBackOffMillis()); + } + } +}