add backoff, add simple pool, clean listeners

This commit is contained in:
Jörg Prante 2018-03-04 22:50:46 +01:00
parent 047ae5bffd
commit 30ac8b09c2
45 changed files with 2183 additions and 821 deletions

View file

@ -37,6 +37,7 @@ dependencies {
compile "org.xbib:net-url:${project.property('xbib-net-url.version')}"
compile "io.netty:netty-codec-http2:${project.property('netty.version')}"
compile "io.netty:netty-handler-proxy:${project.property('netty.version')}"
testCompile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}"
testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}"
testCompile "junit:junit:${project.property('junit.version')}"

View file

@ -1,8 +1,8 @@
group = org.xbib
name = netty-http-client
version = 4.1.19.1
version = 4.1.16.0
netty.version = 4.1.19.Final
netty.version = 4.1.16.Final
tcnative.version = 2.0.7.Final
conscrypt.version = 1.0.1
xbib-net-url.version = 1.1.0

Binary file not shown.

View file

@ -1,6 +1,6 @@
#Sun Feb 25 12:39:15 CET 2018
#Fri Mar 02 19:15:04 CET 2018
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-rc-1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-all.zip

View file

@ -18,15 +18,21 @@ import org.xbib.netty.http.client.handler.http1.HttpResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2ChannelInitializer;
import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2SettingsHandler;
import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.pool.SimpleChannelPool;
import org.xbib.netty.http.client.transport.Http2Transport;
import org.xbib.netty.http.client.transport.HttpTransport;
import org.xbib.netty.http.client.transport.Transport;
import org.xbib.netty.http.client.util.NetworkUtils;
import javax.net.ssl.TrustManagerFactory;
import java.io.IOException;
import java.security.KeyStoreException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import java.util.logging.Level;
@ -62,6 +68,8 @@ public final class Client {
private TransportListener transportListener;
private Pool<Channel> pool;
public Client() {
this(new ClientConfig());
}
@ -74,6 +82,7 @@ public final class Client {
EventLoopGroup eventLoopGroup, Class<? extends SocketChannel> socketChannelClass) {
Objects.requireNonNull(clientConfig);
this.clientConfig = clientConfig;
initializeTrustManagerFactory(clientConfig);
this.byteBufAllocator = byteBufAllocator != null ?
byteBufAllocator : PooledByteBufAllocator.DEFAULT;
this.eventLoopGroup = eventLoopGroup != null ?
@ -94,6 +103,22 @@ public final class Client {
this.http2SettingsHandler = new Http2SettingsHandler();
this.http2ResponseHandler = new Http2ResponseHandler();
this.transports = new CopyOnWriteArrayList<>();
List<HttpAddress> nodes = clientConfig.getNodes();
if (!nodes.isEmpty()) {
Integer limit = clientConfig.getNodeConnectionLimit();
if (limit == null || limit > nodes.size()) {
limit = nodes.size();
}
if (limit < 1) {
limit = 1;
}
Semaphore semaphore = new Semaphore(limit);
Integer retries = clientConfig.getRetriesPerNode();
if (retries == null || retries < 0) {
retries = 0;
}
this.pool = new SimpleChannelPool<>(semaphore, nodes, bootstrap, null, retries);
}
}
public static ClientBuilder builder() {
@ -140,23 +165,28 @@ public final class Client {
public Channel newChannel(HttpAddress httpAddress) throws InterruptedException {
HttpVersion httpVersion = httpAddress.getVersion();
ChannelInitializer<SocketChannel> initializer;
Channel channel;
if (httpVersion.majorVersion() < 2) {
initializer = new HttpChannelInitializer(clientConfig, httpAddress, httpResponseHandler);
channel = bootstrap.handler(initializer)
.connect(httpAddress.getInetSocketAddress()).sync().await().channel();
} else {
initializer = new Http2ChannelInitializer(clientConfig, httpAddress, http2SettingsHandler, http2ResponseHandler);
}
return bootstrap.handler(initializer)
initializer = new Http2ChannelInitializer(clientConfig, httpAddress,
http2SettingsHandler, http2ResponseHandler);
channel = bootstrap.handler(initializer)
.connect(httpAddress.getInetSocketAddress()).sync().await().channel();
}
return channel;
}
public Transport execute(Request request) {
Transport nextTransport = newTransport(HttpAddress.of(request));
nextTransport.execute(request);
return nextTransport;
public Transport execute(Request request) throws IOException {
Transport transport = newTransport(HttpAddress.of(request));
transport.execute(request);
return transport;
}
public <T> CompletableFuture<T> execute(Request request,
Function<FullHttpResponse, T> supplier) {
Function<FullHttpResponse, T> supplier) throws IOException {
return newTransport(HttpAddress.of(request)).execute(request, supplier);
}
@ -165,19 +195,19 @@ public final class Client {
* @param transport the previous transport
* @param request the new request for continuing the request.
*/
public void continuation(Transport transport, Request request) {
public void continuation(Transport transport, Request request) throws IOException {
Transport nextTransport = newTransport(HttpAddress.of(request));
nextTransport.setResponseListener(transport.getResponseListener());
nextTransport.setExceptionListener(transport.getExceptionListener());
nextTransport.setHeadersListener(transport.getHeadersListener());
nextTransport.setCookieListener(transport.getCookieListener());
nextTransport.setPushListener(transport.getPushListener());
nextTransport.setCookieBox(transport.getCookieBox());
nextTransport.execute(request);
nextTransport.get();
close(nextTransport);
}
public void retry(Transport transport, Request request) throws IOException {
transport.execute(request);
transport.get();
close(transport);
}
public Transport prepareRequest(Request request) {
return newTransport(HttpAddress.of(request));
@ -206,6 +236,21 @@ public final class Client {
shutdown();
}
/**
* Initialize trust manager factory once per client lifecycle.
* @param clientConfig the client config
*/
private static void initializeTrustManagerFactory(ClientConfig clientConfig) {
TrustManagerFactory trustManagerFactory = clientConfig.getTrustManagerFactory();
if (trustManagerFactory != null) {
try {
trustManagerFactory.init(clientConfig.getTrustManagerKeyStore());
} catch (KeyStoreException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
}
}
public interface TransportListener {
void onOpen(Transport transport);

View file

@ -3,14 +3,14 @@ package org.xbib.netty.http.client;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.Provider;
import java.util.List;
public class ClientConfig {
@ -110,11 +110,6 @@ public class ClientConfig {
*/
CipherSuiteFilter CIPHER_SUITE_FILTER = SupportedCipherSuiteFilter.INSTANCE;
/**
* Default trust manager factory.
*/
TrustManagerFactory TRUST_MANAGER_FACTORY = InsecureTrustManagerFactory.INSTANCE;
boolean USE_SERVER_NAME_IDENTIFICATION = true;
/**
@ -123,6 +118,20 @@ public class ClientConfig {
ClientAuthMode SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE;
}
private static TrustManagerFactory TRUST_MANAGER_FACTORY;
static {
try {
TRUST_MANAGER_FACTORY = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
//InsecureTrustManagerFactory.INSTANCE;
//TRUST_MANAGER_FACTORY.init((KeyStore) null);
// java.lang.IllegalStateException: TrustManagerFactoryImpl is not initialized
//TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
} catch (Exception e) {
TRUST_MANAGER_FACTORY = null;
}
}
private boolean debug = Defaults.DEBUG;
/**
@ -165,7 +174,9 @@ public class ClientConfig {
private CipherSuiteFilter cipherSuiteFilter = Defaults.CIPHER_SUITE_FILTER;
private TrustManagerFactory trustManagerFactory = Defaults.TRUST_MANAGER_FACTORY;
private TrustManagerFactory trustManagerFactory = TRUST_MANAGER_FACTORY;
private KeyStore trustManagerKeyStore = null;
private boolean serverNameIdentification = Defaults.USE_SERVER_NAME_IDENTIFICATION;
@ -179,6 +190,12 @@ public class ClientConfig {
private HttpProxyHandler httpProxyHandler;
private List<HttpAddress> nodes;
private Integer nodeConnectionLimit;
private Integer retriesPerNode;
public ClientConfig setDebug(boolean debug) {
this.debug = debug;
return this;
@ -370,15 +387,6 @@ public class ClientConfig {
return cipherSuiteFilter;
}
public ClientConfig setTrustManagerFactory(TrustManagerFactory trustManagerFactory) {
this.trustManagerFactory = trustManagerFactory;
return this;
}
public TrustManagerFactory getTrustManagerFactory() {
return trustManagerFactory;
}
public ClientConfig setKeyCert(InputStream keyCertChainInputStream, InputStream keyInputStream) {
this.keyCertChainInputStream = keyCertChainInputStream;
this.keyInputStream = keyInputStream;
@ -423,6 +431,24 @@ public class ClientConfig {
return clientAuthMode;
}
public ClientConfig setTrustManagerFactory(TrustManagerFactory trustManagerFactory) {
this.trustManagerFactory = trustManagerFactory;
return this;
}
public TrustManagerFactory getTrustManagerFactory() {
return trustManagerFactory;
}
public ClientConfig setTrustManagerKeyStore(KeyStore trustManagerKeyStore) {
this.trustManagerKeyStore = trustManagerKeyStore;
return this;
}
public KeyStore getTrustManagerKeyStore() {
return trustManagerKeyStore;
}
public ClientConfig setHttpProxyHandler(HttpProxyHandler httpProxyHandler) {
this.httpProxyHandler = httpProxyHandler;
return this;
@ -432,6 +458,33 @@ public class ClientConfig {
return httpProxyHandler;
}
public ClientConfig setNodes(List<HttpAddress> nodes) {
this.nodes = nodes;
return this;
}
public List<HttpAddress> getNodes() {
return nodes;
}
public ClientConfig setNodeConnectionLimit(Integer nodeConnectionLimit) {
this.nodeConnectionLimit = nodeConnectionLimit;
return this;
}
public Integer getNodeConnectionLimit() {
return nodeConnectionLimit;
}
public ClientConfig setRetriesPerNode(Integer retriesPerNode) {
this.retriesPerNode = retriesPerNode;
return this;
}
public Integer getRetriesPerNode() {
return retriesPerNode;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View file

@ -2,13 +2,14 @@ package org.xbib.netty.http.client;
import io.netty.handler.codec.http.HttpVersion;
import org.xbib.net.URL;
import org.xbib.netty.http.client.pool.PoolKey;
import java.net.InetSocketAddress;
/**
* A handle for host, port, HTTP version, secure transport flag of a channel for HTTP.
*/
public class HttpAddress {
public class HttpAddress implements PoolKey {
private static final HttpVersion HTTP_2_0 = HttpVersion.valueOf("HTTP/2.0");

View file

@ -8,9 +8,7 @@ import io.netty.handler.codec.http.cookie.Cookie;
import org.xbib.net.URL;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.nio.charset.StandardCharsets;
@ -45,14 +43,10 @@ public class Request {
private HttpResponseListener responseListener;
private ExceptionListener exceptionListener;
private HttpHeadersListener headersListener;
private CookieListener cookieListener;
private HttpPushListener pushListener;
Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod,
HttpHeaders headers, Collection<Cookie> cookies,
String uri, ByteBuf content,
@ -120,12 +114,13 @@ public class Request {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("base=").append(base).append(',')
.append("version=").append(httpVersion).append(',')
.append("method=").append(httpMethod).append(',')
.append("relativeUri=").append(uri).append(',')
.append("headers=").append(headers).append(',')
.append("content=").append(content != null ? content.copy(0,16).toString(StandardCharsets.UTF_8) : "");
sb.append("Request[base='").append(base)
.append("',version=").append(httpVersion)
.append(",method=").append(httpMethod)
.append(",uri=").append(uri)
.append(",headers=").append(headers.entries())
.append(",content=").append(content != null ? content.copy(0,16).toString(StandardCharsets.UTF_8) : "")
.append("]");
return sb.toString();
}
@ -156,24 +151,6 @@ public class Request {
return responseListener;
}
public Request setExceptionListener(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
return this;
}
public ExceptionListener getExceptionListener() {
return exceptionListener;
}
public Request setPushListener(HttpPushListener httpPushListener) {
this.pushListener = httpPushListener;
return this;
}
public HttpPushListener getPushListener() {
return pushListener;
}
public static RequestBuilder get() {
return builder(HttpMethod.GET);
}

View file

@ -13,6 +13,7 @@ import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.util.AsciiString;
import org.xbib.net.QueryParameters;
import org.xbib.net.URL;
import org.xbib.net.URLSyntaxException;
@ -67,7 +68,9 @@ public class RequestBuilder {
private URL url;
private QueryStringEncoder queryStringEncoder;
private String uri;
private QueryParameters queryParameters;
private ByteBuf content;
@ -90,6 +93,7 @@ public class RequestBuilder {
headers = new DefaultHttpHeaders();
removeHeaders = new ArrayList<>();
cookies = new HashSet<>();
queryParameters = new QueryParameters();
}
public RequestBuilder setMethod(HttpMethod httpMethod) {
@ -97,12 +101,12 @@ public class RequestBuilder {
return this;
}
public RequestBuilder setHttp1() {
public RequestBuilder enableHttp1() {
this.httpVersion = HttpVersion.HTTP_1_1;
return this;
}
public RequestBuilder setHttp2() {
public RequestBuilder enableHttp2() {
this.httpVersion = HTTP_2_0;
return this;
}
@ -122,32 +126,27 @@ public class RequestBuilder {
return this;
}
public RequestBuilder setURL(String url) {
return setURL(URL.from(url));
}
public RequestBuilder setURL(URL url) {
this.url = url;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(URI.create(url.toString()), StandardCharsets.UTF_8);
this.queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
for (Map.Entry<String, List<String>> entry : queryStringDecoder.parameters().entrySet()) {
for (String value : entry.getValue()) {
queryStringEncoder.addParam(entry.getKey(), value);
}
}
public RequestBuilder remoteAddress(HttpAddress httpAddress) {
this.url = URL.builder()
.scheme(httpAddress.isSecure() ? "https" : "http")
.host(httpAddress.getInetSocketAddress().getHostString())
.port(httpAddress.getInetSocketAddress().getPort())
.build();
this.httpVersion = httpAddress.getVersion();
return this;
}
public RequestBuilder path(String path) {
if (this.url != null) {
try {
setURL(URL.base(url).resolve(path).toString());
} catch (URLSyntaxException e) {
throw new IllegalArgumentException(e);
public RequestBuilder url(String url) {
return url(URL.from(url));
}
} else {
setURL(path);
public RequestBuilder url(URL url) {
this.url = url;
return this;
}
public RequestBuilder uri(String uri) {
this.uri = uri;
return this;
}
@ -171,9 +170,9 @@ public class RequestBuilder {
return this;
}
public RequestBuilder addParam(String name, String value) {
if (queryStringEncoder != null) {
queryStringEncoder.addParam(name, value);
public RequestBuilder addParameter(String name, String value) {
if (queryParameters != null) {
queryParameters.add(name, value);
}
return this;
}
@ -213,11 +212,6 @@ public class RequestBuilder {
return this;
}
public RequestBuilder setContent(ByteBuf byteBuf) {
this.content = byteBuf;
return this;
}
public RequestBuilder text(String text) {
content(text, HttpHeaderValues.TEXT_PLAIN);
return this;
@ -233,6 +227,11 @@ public class RequestBuilder {
return this;
}
public RequestBuilder content(ByteBuf byteBuf) {
this.content = byteBuf;
return this;
}
public RequestBuilder content(CharSequence charSequence, String contentType) {
content(charSequence.toString().getBytes(StandardCharsets.UTF_8), AsciiString.of(contentType));
return this;
@ -253,8 +252,38 @@ public class RequestBuilder {
throw new IllegalStateException("URL not set");
}
if (url.getHost() == null) {
throw new IllegalStateException("URL host not set: " + url);
throw new IllegalStateException("host in URL not defined: " + url);
}
if (uri != null) {
if (this.url != null) {
try {
url = URL.base(url).resolve(uri);
} catch (URLSyntaxException e) {
throw new IllegalArgumentException(e);
}
} else {
url(uri);
}
}
// add explicit parameters to URL
queryParameters.forEach(param -> url.getQueryParams().add(param));
// let Netty's query string decoder/encoder work over the URL to add paramters given implicitly in url()
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(URI.create(url.toString()), StandardCharsets.UTF_8);
QueryStringEncoder queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
for (Map.Entry<String, List<String>> entry : queryStringDecoder.parameters().entrySet()) {
for (String value : entry.getValue()) {
queryStringEncoder.addParam(entry.getKey(), value);
}
}
// build uri from QueryStringDecoder
StringBuilder sb = new StringBuilder();
String pathAndQuery = queryStringEncoder.toString();
sb.append(pathAndQuery.isEmpty() ? "/" : pathAndQuery);
String ref = url.getFragment();
if (ref != null && !ref.isEmpty()) {
sb.append('#').append(ref);
}
String uri = sb.toString();
DefaultHttpHeaders validatedHeaders = new DefaultHttpHeaders(true);
validatedHeaders.set(headers);
String scheme = url.getScheme();
@ -290,23 +319,10 @@ public class RequestBuilder {
for (String headerName : removeHeaders) {
validatedHeaders.remove(headerName);
}
// create origin form from query string encoder
String uri = toOriginForm();
return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, uri, content,
timeout, followRedirect, maxRedirects, 0);
}
private String toOriginForm() {
StringBuilder sb = new StringBuilder();
String pathAndQuery = queryStringEncoder.toString();
sb.append(pathAndQuery.isEmpty() ? "/" : pathAndQuery);
String ref = url.getFragment();
if (ref != null && !ref.isEmpty()) {
sb.append('#').append(ref);
}
return sb.toString();
}
private void addHeader(AsciiString name, Object value) {
if (!headers.contains(name)) {
headers.add(name, value);

View file

@ -9,7 +9,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.xbib.netty.http.client.ClientConfig;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.handler.TrafficLoggingHandler;
@ -51,11 +50,15 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
try {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.sslProvider(clientConfig.getSslProvider())
.sslContextProvider(clientConfig.getSslContextProvider())
.keyManager(clientConfig.getKeyCertChainInputStream(), clientConfig.getKeyInputStream(),
clientConfig.getKeyPassword())
.ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter())
.trustManager(clientConfig.getTrustManagerFactory());
.ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter());
if (clientConfig.getSslContextProvider() != null) {
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
}
if (clientConfig.getTrustManagerFactory() != null) {
sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
}
SslContext sslContext = sslContextBuilder.build();
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();

View file

@ -4,7 +4,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2SecurityUtil;
@ -18,7 +18,6 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.xbib.netty.http.client.ClientConfig;
import org.xbib.netty.http.client.HttpAddress;
@ -60,10 +59,10 @@ public class Http2ChannelInitializer extends ChannelInitializer<SocketChannel> {
*/
@Override
protected void initChannel(SocketChannel ch) {
DefaultHttp2Connection http2Connection = new DefaultHttp2Connection(false);
Http2Connection http2Connection = new DefaultHttp2Connection(false);
HttpToHttp2ConnectionHandlerBuilder http2ConnectionHandlerBuilder = new HttpToHttp2ConnectionHandlerBuilder()
.connection(http2Connection)
.frameListener(new DelegatingDecompressorFrameListener(http2Connection,
.frameListener(new Http2PushPromiseHandler(http2Connection,
new InboundHttp2ToHttpAdapterBuilder(http2Connection)
.maxContentLength(clientConfig.getMaxContentLength())
.propagateSettings(true)
@ -75,7 +74,6 @@ public class Http2ChannelInitializer extends ChannelInitializer<SocketChannel> {
try {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.sslProvider(clientConfig.getSslProvider())
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
@ -85,6 +83,9 @@ public class Http2ChannelInitializer extends ChannelInitializer<SocketChannel> {
if (clientConfig.getSslContextProvider() != null) {
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
}
if (clientConfig.getTrustManagerFactory() != null) {
sslContextBuilder.trustManager(clientConfig.getTrustManagerFactory());
}
SslContext sslContext = sslContextBuilder.build();
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();

View file

@ -0,0 +1,24 @@
package org.xbib.netty.http.client.handler.http2;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameListener;
import io.netty.handler.codec.http2.Http2Headers;
import org.xbib.netty.http.client.transport.Transport;
public class Http2PushPromiseHandler extends DelegatingDecompressorFrameListener {
public Http2PushPromiseHandler(Http2Connection connection, Http2FrameListener listener) {
super(connection, listener);
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
super.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
transport.pushPromiseReceived(streamId, promisedStreamId, headers);
}
}

View file

@ -20,11 +20,6 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
transport.responseReceived(streamId, httpResponse);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// do nothing
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.fireChannelInactive();

View file

@ -1,10 +0,0 @@
package org.xbib.netty.http.client.listener;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.Http2Headers;
@FunctionalInterface
public interface HttpPushListener {
void onPushReceived(Http2Headers headers, FullHttpResponse fullHttpResponse);
}

View file

@ -1,23 +0,0 @@
package org.xbib.netty.http.client.pool;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import java.io.Closeable;
import java.net.ConnectException;
import java.util.List;
public interface ChannelPool extends Closeable {
AttributeKey<String> NODE_ATTRIBUTE_KEY = AttributeKey.valueOf("node");
void prepare(int count) throws ConnectException;
Channel lease() throws ConnectException;
int lease(List<Channel> channels, int maxCount) throws ConnectException;
void release(Channel channel);
void release(List<Channel> channels);
}

View file

@ -0,0 +1,17 @@
package org.xbib.netty.http.client.pool;
import java.io.Closeable;
import java.util.List;
public interface Pool<T> extends Closeable {
void prepare(int count) throws Exception;
T acquire() throws Exception;
int acquire(List<T> list, int maxCount) throws Exception;
void release(T t) throws Exception;
void release(List<T> list) throws Exception;
}

View file

@ -0,0 +1,8 @@
package org.xbib.netty.http.client.pool;
import java.net.InetSocketAddress;
public interface PoolKey {
InetSocketAddress getInetSocketAddress();
}

View file

@ -6,9 +6,9 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.AttributeKey;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -19,73 +19,74 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
public class SimpleChannelPool implements ChannelPool {
public class SimpleChannelPool<K extends PoolKey> implements Pool<Channel> {
private static final Logger logger = Logger.getLogger(SimpleChannelPool.class.getName());
private final Semaphore semaphore;
private final List<String> nodes;
private final ChannelPoolHandler channelPoolhandler;
private final List<K> nodes;
private final int numberOfNodes;
private final int retriesPerNode;
private final Map<String, Bootstrap> bootstraps;
private final Map<K, Bootstrap> bootstraps;
private final Map<String, List<Channel>> channels;
private final Map<K, List<Channel>> channels;
private final Map<String, Queue<Channel>> availableChannels;
private final Map<K, Queue<Channel>> availableChannels;
private final Map<String, Integer> counts;
private final Map<K, Integer> counts;
private final Map<String, Integer> failedCounts;
private final Map<K, Integer> failedCounts;
private final Lock lock = new ReentrantLock();
private final Lock lock;
private final AttributeKey<K> attributeKey;
/**
* @param semaphore the throttle for the concurrency level control
* @param semaphore the concurrency level
* @param nodes the endpoint nodes, any element may contain the port (followed after ":")
* to override the defaultPort argument
* @param bootstrap bootstrap instance
* @param channelPoolHandler channel pool handler being notified upon new connection is created
* @param defaultPort default port used to connect (any node address from the nodes set may override this)
* @param retriesPerNode the max count of the subsequent connection failures to the node before
* the node will be excluded from the pool, 0 means no limit
* the node will be excluded from the pool. If set to 0, the value is ignored.
*/
public SimpleChannelPool(Semaphore semaphore, List<String> nodes, Bootstrap bootstrap,
ChannelPoolHandler channelPoolHandler, int defaultPort, int retriesPerNode) {
public SimpleChannelPool(Semaphore semaphore, List<K> nodes, Bootstrap bootstrap,
ChannelPoolHandler channelPoolHandler, int retriesPerNode) {
this.semaphore = semaphore;
this.channelPoolhandler = channelPoolHandler;
this.nodes = nodes;
this.retriesPerNode = retriesPerNode;
this.lock = new ReentrantLock();
this.attributeKey = AttributeKey.valueOf("poolKey");
if (nodes == null || nodes.isEmpty()) {
throw new IllegalArgumentException("empty nodes array argument");
}
this.nodes = nodes;
this.retriesPerNode = retriesPerNode;
this.numberOfNodes = nodes.size();
bootstraps = new HashMap<>(numberOfNodes);
channels = new HashMap<>(numberOfNodes);
availableChannels = new HashMap<>(numberOfNodes);
counts = new HashMap<>(numberOfNodes);
failedCounts = new HashMap<>(numberOfNodes);
for (String node : nodes) {
InetSocketAddress nodeAddr;
if (node.contains(":")) {
String addrParts[] = node.split(":");
nodeAddr = new InetSocketAddress(addrParts[0], Integer.parseInt(addrParts[1]));
} else {
nodeAddr = new InetSocketAddress(node, defaultPort);
}
bootstraps.put(node, bootstrap.clone().remoteAddress(nodeAddr)
for (K node : nodes) {
bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress())
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel conn) throws Exception {
if(!conn.eventLoop().inEventLoop()) {
throw new AssertionError();
protected void initChannel(Channel channel) throws Exception {
if(!channel.eventLoop().inEventLoop()) {
throw new IllegalStateException();
}
if (channelPoolHandler != null) {
channelPoolHandler.channelCreated(channel);
}
channelPoolHandler.channelCreated(conn);
}
}));
availableChannels.put(node, new ConcurrentLinkedQueue<>());
@ -100,9 +101,9 @@ public class SimpleChannelPool implements ChannelPool {
for (int i = 0; i < count; i ++) {
Channel channel = connectToAnyNode();
if (channel == null) {
throw new ConnectException("Failed to pre-create the connections to the target nodes");
throw new ConnectException("failed to prepare the connections");
}
String nodeAddr = channel.attr(NODE_ATTRIBUTE_KEY).get();
K nodeAddr = channel.attr(attributeKey).get();
if (channel.isActive()) {
Queue<Channel> channelQueue = availableChannels.get(nodeAddr);
if (channelQueue != null) {
@ -112,49 +113,117 @@ public class SimpleChannelPool implements ChannelPool {
channel.close();
}
}
logger.info("prepared " + count + " connections");
logger.log(Level.FINE,"prepared " + count + " connections");
} else {
throw new IllegalArgumentException("Connection count should be > 0, but got " + count);
}
}
private class CloseChannelListener implements ChannelFutureListener {
private final String nodeAddr;
private final Channel conn;
private CloseChannelListener(String nodeAddr, Channel conn) {
this.nodeAddr = nodeAddr;
this.conn = conn;
@Override
public Channel acquire() throws Exception {
Channel channel = null;
if (semaphore.tryAcquire()) {
if ((channel = poll()) == null) {
channel = connectToAnyNode();
}
if (channel == null) {
semaphore.release();
throw new ConnectException();
}
}
if (channelPoolhandler != null) {
channelPoolhandler.channelAcquired(channel);
}
return channel;
}
@Override
public void operationComplete(ChannelFuture future) {
logger.fine("connection to " + nodeAddr + " closed");
lock.lock();
try {
synchronized (counts) {
if(counts.containsKey(nodeAddr)) {
counts.put(nodeAddr, counts.get(nodeAddr) - 1);
public int acquire(List<Channel> channels, int maxCount) throws Exception {
int availableCount = semaphore.drainPermits();
if (availableCount == 0) {
return availableCount;
}
if (availableCount > maxCount) {
semaphore.release(availableCount - maxCount);
availableCount = maxCount;
}
Channel channel;
for (int i = 0; i < availableCount; i ++) {
if (null == (channel = poll())) {
channel = connectToAnyNode();
}
if (channel == null) {
semaphore.release(availableCount - i);
throw new ConnectException();
} else {
if (channelPoolhandler != null) {
channelPoolhandler.channelAcquired(channel);
}
channels.add(channel);
}
}
synchronized (channels) {
List<Channel> nodeConns = channels.get(nodeAddr);
if(nodeConns != null) {
nodeConns.remove(conn);
return availableCount;
}
@Override
public void release(Channel channel) throws Exception {
K nodeAddr = channel.attr(attributeKey).get();
if (channel.isActive()) {
Queue<Channel> channelQueue = availableChannels.get(nodeAddr);
if (channelQueue != null) {
channelQueue.add(channel);
}
semaphore.release();
} else {
channel.close();
}
if (channelPoolhandler != null) {
channelPoolhandler.channelReleased(channel);
}
}
@Override
public void release(List<Channel> channels) throws Exception {
for (Channel channel : channels) {
release(channel);
}
}
@Override
public void close() {
lock.lock();
try {
int closedConnCount = 0;
for (K nodeAddr : availableChannels.keySet()) {
for (Channel conn : availableChannels.get(nodeAddr)) {
if (conn.isOpen()) {
conn.close();
closedConnCount++;
}
}
}
availableChannels.clear();
for (K nodeAddr : channels.keySet()) {
for (Channel channel : channels.get(nodeAddr)) {
if (channel != null && channel.isOpen()) {
channel.close();
closedConnCount++;
}
}
}
channels.clear();
bootstraps.clear();
counts.clear();
logger.log(Level.FINE, "closed " + closedConnCount + " connections");
} finally {
lock.unlock();
}
}
}
private Channel connectToAnyNode() throws ConnectException {
Channel channel = null;
String nodeAddr = null;
String nextNodeAddr;
K nodeAddr = null;
K nextNodeAddr;
int min = Integer.MAX_VALUE;
int next;
int i = ThreadLocalRandom.current().nextInt(numberOfNodes);
@ -170,28 +239,28 @@ public class SimpleChannelPool implements ChannelPool {
}
}
if (nodeAddr != null) {
logger.fine("trying connection to " + nodeAddr);
logger.log(Level.FINE, "trying connection to " + nodeAddr);
try {
channel = connect(nodeAddr);
} catch (Exception e) {
logger.warning("failed to create a new connection to " + nodeAddr + ": " + e.toString());
logger.log(Level.WARNING, "failed to create a new connection to " + nodeAddr + ": " + e.toString());
if (retriesPerNode > 0) {
int selectedNodeFailedConnAttemptsCount = failedCounts.get(nodeAddr) + 1;
failedCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount);
if (selectedNodeFailedConnAttemptsCount > retriesPerNode) {
logger.warning("Failed to connect to the node \"" + nodeAddr + "\" "
+ selectedNodeFailedConnAttemptsCount + " times successively, "
+ "excluding the node from the connection pool forever");
logger.log(Level.WARNING, "failed to connect to the node " + nodeAddr + " "
+ selectedNodeFailedConnAttemptsCount + " times, "
+ "excluding the node from the connection pool");
counts.put(nodeAddr, Integer.MAX_VALUE);
boolean allNodesExcluded = true;
for (String node : nodes) {
for (K node : nodes) {
if (counts.get(node) < Integer.MAX_VALUE) {
allNodesExcluded = false;
break;
}
}
if (allNodesExcluded) {
logger.severe("no endpoint nodes left in the connection pool");
logger.log(Level.SEVERE, "no nodes left in the connection pool");
}
}
}
@ -204,20 +273,20 @@ public class SimpleChannelPool implements ChannelPool {
}
if (channel != null) {
channel.closeFuture().addListener(new CloseChannelListener(nodeAddr, channel));
channel.attr(NODE_ATTRIBUTE_KEY).set(nodeAddr);
channels.computeIfAbsent(nodeAddr, na -> new ArrayList<>()).add(channel);
channel.attr(attributeKey).set(nodeAddr);
channels.computeIfAbsent(nodeAddr, node -> new ArrayList<>()).add(channel);
synchronized (counts) {
counts.put(nodeAddr, counts.get(nodeAddr) + 1);
}
if(retriesPerNode > 0) {
failedCounts.put(nodeAddr, 0);
}
logger.fine("new connection to " + nodeAddr + " created");
logger.log(Level.FINE,"new connection to " + nodeAddr + " created");
}
return channel;
}
protected Channel connect(String addr) throws Exception {
private Channel connect(K addr) throws Exception {
Bootstrap bootstrap = bootstraps.get(addr);
if (bootstrap != null) {
return bootstrap.connect().sync().channel();
@ -241,100 +310,36 @@ public class SimpleChannelPool implements ChannelPool {
return null;
}
@Override
public Channel lease() throws ConnectException {
Channel conn = null;
if (semaphore.tryAcquire()) {
if (null == (conn = poll())) {
conn = connectToAnyNode();
}
if (conn == null) {
semaphore.release();
throw new ConnectException();
}
}
return conn;
private class CloseChannelListener implements ChannelFutureListener {
private final K nodeAddr;
private final Channel channel;
private CloseChannelListener(K nodeAddr, Channel channel) {
this.nodeAddr = nodeAddr;
this.channel = channel;
}
@Override
public int lease(List<Channel> channels, int maxCount) throws ConnectException {
int availableCount = semaphore.drainPermits();
if (availableCount == 0) {
return availableCount;
}
if (availableCount > maxCount) {
semaphore.release(availableCount - maxCount);
availableCount = maxCount;
}
Channel conn;
for (int i = 0; i < availableCount; i ++) {
if (null == (conn = poll())) {
conn = connectToAnyNode();
}
if (conn == null) {
semaphore.release(availableCount - i);
throw new ConnectException();
} else {
channels.add(conn);
}
}
return availableCount;
}
@Override
public void release(Channel conn) {
String nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get();
if( conn.isActive()) {
Queue<Channel> connQueue = availableChannels.get(nodeAddr);
if (connQueue != null) {
connQueue.add(conn);
}
semaphore.release();
} else {
conn.close();
}
}
@Override
public void release(List<Channel> conns) {
String nodeAddr;
Queue<Channel> connQueue;
for (Channel conn : conns) {
nodeAddr = conn.attr(NODE_ATTRIBUTE_KEY).get();
if (conn.isActive()) {
connQueue = availableChannels.get(nodeAddr);
connQueue.add(conn);
semaphore.release();
} else {
conn.close();
}
}
}
@Override
public void close() {
public void operationComplete(ChannelFuture future) {
logger.log(Level.FINE,"connection to " + nodeAddr + " closed");
lock.lock();
int closedConnCount = 0;
for (String nodeAddr: availableChannels.keySet()) {
for (Channel conn: availableChannels.get(nodeAddr)) {
if (conn.isOpen()) {
conn.close();
closedConnCount ++;
try {
synchronized (counts) {
if (counts.containsKey(nodeAddr)) {
counts.put(nodeAddr, counts.get(nodeAddr) - 1);
}
}
synchronized (channels) {
List<Channel> channels = SimpleChannelPool.this.channels.get(nodeAddr);
if (channels != null) {
channels.remove(channel);
}
}
semaphore.release();
} finally {
lock.unlock();
}
}
}
availableChannels.clear();
for (String nodeAddr: channels.keySet()) {
for (Channel conn: channels.get(nodeAddr)) {
if (conn.isOpen()) {
conn.close();
closedConnCount ++;
}
}
}
channels.clear();
bootstraps.clear();
counts.clear();
logger.fine("closed " + closedConnCount + " connections");
}
}

View file

@ -11,7 +11,6 @@ import org.xbib.netty.http.client.RequestBuilder;
import org.xbib.netty.http.client.transport.Transport;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.logging.Logger;
@ -55,21 +54,14 @@ public class RestClient {
Client client = new Client();
Transport transport = client.newTransport(HttpAddress.http1(url));
RestClient restClient = new RestClient(client, transport);
transport.setResponseListener(restClient::setResponse);
try {
transport.connect();
} catch (InterruptedException e) {
throw new ConnectException("unable to connect to " + url);
}
transport.awaitSettings();
RequestBuilder requestBuilder = Request.builder(httpMethod);
requestBuilder.setURL(url);
requestBuilder.url(url);
if (body != null && charset != null) {
ByteBuf byteBuf = client.getByteBufAllocator().buffer();
byteBuf.writeCharSequence(body, charset);
requestBuilder.setContent(byteBuf);
requestBuilder.content(byteBuf);
}
transport.execute(requestBuilder.build()).get();
transport.execute(requestBuilder.build().setResponseListener(restClient::setResponse)).get();
return restClient;
}

View file

@ -0,0 +1,65 @@
package org.xbib.netty.http.client.retry;
import java.io.IOException;
/**
* Back-off policy when retrying an operation.
*/
public interface BackOff {
/**
* Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */
long STOP = -1L;
/**
* Reset to initial state.
*/
void reset() throws IOException;
/**
* Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to
* indicate that no retries should be made.
*
* <p>
* Example usage:
* </p>
*
* <pre>
long backOffMillis = backoff.nextBackOffMillis();
if (backOffMillis == Backoff.STOP) {
// do not retry operation
} else {
// sleep for backOffMillis milliseconds and retry operation
}
* </pre>
*/
long nextBackOffMillis() throws IOException;
/**
* Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried
* immediately without waiting.
*/
BackOff ZERO_BACKOFF = new BackOff() {
public void reset() {
}
public long nextBackOffMillis() {
return 0;
}
};
/**
* Fixed back-off policy that always returns {@code #STOP} for {@link #nextBackOffMillis()},
* meaning that the operation should not be retried.
*/
BackOff STOP_BACKOFF = new BackOff() {
public void reset() {
}
public long nextBackOffMillis() {
return STOP;
}
};
}

View file

@ -0,0 +1,487 @@
package org.xbib.netty.http.client.retry;
/**
* Implementation of {@link BackOff} that increases the back off period for each retry attempt using
* a randomization function that grows exponentially.
*
* <p>
* {@link #nextBackOffMillis()} is calculated using the following formula:
* </p>
*
* <pre>
randomized_interval =
retry_interval * (random value in range [1 - randomization_factor, 1 + randomization_factor])
* </pre>
*
* <p>
* In other words {@link #nextBackOffMillis()} will range between the randomization factor
* percentage below and above the retry interval. For example, using 2 seconds as the base retry
* interval and 0.5 as the randomization factor, the actual back off period used in the next retry
* attempt will be between 1 and 3 seconds.
* </p>
*
* <p>
* <b>Note:</b> max_interval caps the retry_interval and not the randomized_interval.
* </p>
*
* <p>
* If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the
* max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning
* {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}.
* </p>
*
* <p>
* Example: The default retry_interval is .5 seconds, default randomization_factor is 0.5, default
* multiplier is 1.5 and the default max_interval is 1 minute. For 10 tries the sequence will be
* (values in seconds) and assuming we go over the max_elapsed_time on the 10th try:
* </p>
*
* <pre>
request# retry_interval randomized_interval
1 0.5 [0.25, 0.75]
2 0.75 [0.375, 1.125]
3 1.125 [0.562, 1.687]
4 1.687 [0.8435, 2.53]
5 2.53 [1.265, 3.795]
6 3.795 [1.897, 5.692]
7 5.692 [2.846, 8.538]
8 8.538 [4.269, 12.807]
9 12.807 [6.403, 19.210]
10 19.210 {@link BackOff#STOP}
* </pre>
*
* <p>
* Implementation is not thread-safe.
* </p>
*/
public class ExponentialBackOff implements BackOff {
/** The default initial interval value in milliseconds (0.5 seconds). */
public static final int DEFAULT_INITIAL_INTERVAL_MILLIS = 500;
/**
* The default randomization factor (0.5 which results in a random period ranging between 50%
* below and 50% above the retry interval).
*/
public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
/** The default multiplier value (1.5 which is 50% increase per back off). */
public static final double DEFAULT_MULTIPLIER = 1.5;
/** The default maximum back off time in milliseconds (1 minute). */
public static final int DEFAULT_MAX_INTERVAL_MILLIS = 60000;
/** The default maximum elapsed time in milliseconds (15 minutes). */
public static final int DEFAULT_MAX_ELAPSED_TIME_MILLIS = 900000;
/** The current retry interval in milliseconds. */
private int currentIntervalMillis;
/** The initial retry interval in milliseconds. */
private final int initialIntervalMillis;
/**
* The randomization factor to use for creating a range around the retry interval.
*
* <p>
* A randomization factor of 0.5 results in a random period ranging between 50% below and 50%
* above the retry interval.
* </p>
*/
private final double randomizationFactor;
/** The value to multiply the current interval with for each retry attempt. */
private final double multiplier;
/**
* The maximum value of the back off period in milliseconds. Once the retry interval reaches this
* value it stops increasing.
*/
private final int maxIntervalMillis;
/**
* The system time in nanoseconds. It is calculated when an ExponentialBackOffPolicy instance is
* created and is reset when {@link #reset()} is called.
*/
private long startTimeNanos;
/**
* The maximum elapsed time after instantiating {@link ExponentialBackOff} or calling
* {@link #reset()} after which {@link #nextBackOffMillis()} returns {@link BackOff#STOP}.
*/
private final int maxElapsedTimeMillis;
/** Nano clock. */
private final NanoClock nanoClock;
/**
* Creates an instance of ExponentialBackOffPolicy using default values.
*
* <p>
* To override the defaults use {@link Builder}.
* </p>
*
* <ul>
* <li>{@code initialIntervalMillis} defaults to {@link #DEFAULT_INITIAL_INTERVAL_MILLIS}</li>
* <li>{@code randomizationFactor} defaults to {@link #DEFAULT_RANDOMIZATION_FACTOR}</li>
* <li>{@code multiplier} defaults to {@link #DEFAULT_MULTIPLIER}</li>
* <li>{@code maxIntervalMillis} defaults to {@link #DEFAULT_MAX_INTERVAL_MILLIS}</li>
* <li>{@code maxElapsedTimeMillis} defaults in {@link #DEFAULT_MAX_ELAPSED_TIME_MILLIS}</li>
* </ul>
*/
public ExponentialBackOff() {
this(new Builder());
}
/**
* @param builder builder
*/
protected ExponentialBackOff(Builder builder) {
initialIntervalMillis = builder.initialIntervalMillis;
randomizationFactor = builder.randomizationFactor;
multiplier = builder.multiplier;
maxIntervalMillis = builder.maxIntervalMillis;
maxElapsedTimeMillis = builder.maxElapsedTimeMillis;
nanoClock = builder.nanoClock;
//Preconditions.checkArgument(initialIntervalMillis > 0);
//Preconditions.checkArgument(0 <= randomizationFactor && randomizationFactor < 1);
//Preconditions.checkArgument(multiplier >= 1);
//Preconditions.checkArgument(maxIntervalMillis >= initialIntervalMillis);
//Preconditions.checkArgument(maxElapsedTimeMillis > 0);
reset();
}
/** Sets the interval back to the initial retry interval and restarts the timer. */
public final void reset() {
currentIntervalMillis = initialIntervalMillis;
startTimeNanos = nanoClock.nanoTime();
}
public void setStartTimeNanos(long startTimeNanos) {
this.startTimeNanos = startTimeNanos;
}
/**
* {@inheritDoc}
*
* <p>
* This method calculates the next back off interval using the formula: randomized_interval =
* retry_interval +/- (randomization_factor * retry_interval)
* </p>
*
* <p>
* Subclasses may override if a different algorithm is required.
* </p>
*/
public long nextBackOffMillis() {
// Make sure we have not gone over the maximum elapsed time.
if (getElapsedTimeMillis() > maxElapsedTimeMillis) {
return STOP;
}
int randomizedInterval =
getRandomValueFromInterval(randomizationFactor, Math.random(), currentIntervalMillis);
incrementCurrentInterval();
return randomizedInterval;
}
/**
* Returns a random value from the interval [randomizationFactor * currentInterval,
* randomizationFactor * currentInterval].
*/
public static int getRandomValueFromInterval(double randomizationFactor, double random, int currentIntervalMillis) {
double delta = randomizationFactor * currentIntervalMillis;
double minInterval = currentIntervalMillis - delta;
double maxInterval = currentIntervalMillis + delta;
// Get a random value from the range [minInterval, maxInterval].
// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
// we want a 33% chance for selecting either 1, 2 or 3.
return (int) (minInterval + (random * (maxInterval - minInterval + 1)));
}
/** Returns the initial retry interval in milliseconds. */
public final int getInitialIntervalMillis() {
return initialIntervalMillis;
}
/**
* Returns the randomization factor to use for creating a range around the retry interval.
*
* <p>
* A randomization factor of 0.5 results in a random period ranging between 50% below and 50%
* above the retry interval.
* </p>
*/
public final double getRandomizationFactor() {
return randomizationFactor;
}
/**
* Returns the current retry interval in milliseconds.
*/
public final int getCurrentIntervalMillis() {
return currentIntervalMillis;
}
/**
* Returns the value to multiply the current interval with for each retry attempt.
*/
public final double getMultiplier() {
return multiplier;
}
/**
* Returns the maximum value of the back off period in milliseconds. Once the current interval
* reaches this value it stops increasing.
*/
public final int getMaxIntervalMillis() {
return maxIntervalMillis;
}
/**
* Returns the maximum elapsed time in milliseconds.
*
* <p>
* If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the
* max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning
* {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}.
* </p>
*/
public final int getMaxElapsedTimeMillis() {
return maxElapsedTimeMillis;
}
/**
* Returns the elapsed time in milliseconds since an {@link ExponentialBackOff} instance is
* created and is reset when {@link #reset()} is called.
*
* <p>
* The elapsed time is computed using {@link System#nanoTime()}.
* </p>
*/
public final long getElapsedTimeMillis() {
return (nanoClock.nanoTime() - startTimeNanos) / 1000000;
}
/**
* Increments the current interval by multiplying it with the multiplier.
*/
private void incrementCurrentInterval() {
// Check for overflow, if overflow is detected set the current interval to the max interval.
if (currentIntervalMillis >= maxIntervalMillis / multiplier) {
currentIntervalMillis = maxIntervalMillis;
} else {
currentIntervalMillis *= multiplier;
}
}
/**
* Builder for {@link ExponentialBackOff}.
*
* <p>
* Implementation is not thread-safe.
* </p>
*/
public static class Builder {
/** The initial retry interval in milliseconds. */
int initialIntervalMillis = DEFAULT_INITIAL_INTERVAL_MILLIS;
/**
* The randomization factor to use for creating a range around the retry interval.
*
* <p>
* A randomization factor of 0.5 results in a random period ranging between 50% below and 50%
* above the retry interval.
* </p>
*/
double randomizationFactor = DEFAULT_RANDOMIZATION_FACTOR;
/** The value to multiply the current interval with for each retry attempt. */
double multiplier = DEFAULT_MULTIPLIER;
/**
* The maximum value of the back off period in milliseconds. Once the retry interval reaches
* this value it stops increasing.
*/
int maxIntervalMillis = DEFAULT_MAX_INTERVAL_MILLIS;
/**
* The maximum elapsed time in milliseconds after instantiating {@link ExponentialBackOff} or
* calling {@link #reset()} after which {@link #nextBackOffMillis()} returns
* {@link BackOff#STOP}.
*/
int maxElapsedTimeMillis = DEFAULT_MAX_ELAPSED_TIME_MILLIS;
/** Nano clock. */
NanoClock nanoClock = NanoClock.SYSTEM;
public Builder() {
}
/** Builds a new instance of {@link ExponentialBackOff}. */
public ExponentialBackOff build() {
return new ExponentialBackOff(this);
}
/**
* Returns the initial retry interval in milliseconds. The default value is
* {@link #DEFAULT_INITIAL_INTERVAL_MILLIS}.
*/
public final int getInitialIntervalMillis() {
return initialIntervalMillis;
}
/**
* Sets the initial retry interval in milliseconds. The default value is
* {@link #DEFAULT_INITIAL_INTERVAL_MILLIS}. Must be {@code > 0}.
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public Builder setInitialIntervalMillis(int initialIntervalMillis) {
this.initialIntervalMillis = initialIntervalMillis;
return this;
}
/**
* Returns the randomization factor to use for creating a range around the retry interval. The
* default value is {@link #DEFAULT_RANDOMIZATION_FACTOR}.
*
* <p>
* A randomization factor of 0.5 results in a random period ranging between 50% below and 50%
* above the retry interval.
* </p>
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public final double getRandomizationFactor() {
return randomizationFactor;
}
/**
* Sets the randomization factor to use for creating a range around the retry interval. The
* default value is {@link #DEFAULT_RANDOMIZATION_FACTOR}. Must fall in the range
* {@code 0 <= randomizationFactor < 1}.
*
* <p>
* A randomization factor of 0.5 results in a random period ranging between 50% below and 50%
* above the retry interval.
* </p>
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public Builder setRandomizationFactor(double randomizationFactor) {
this.randomizationFactor = randomizationFactor;
return this;
}
/**
* Returns the value to multiply the current interval with for each retry attempt. The default
* value is {@link #DEFAULT_MULTIPLIER}.
*/
public final double getMultiplier() {
return multiplier;
}
/**
* Sets the value to multiply the current interval with for each retry attempt. The default
* value is {@link #DEFAULT_MULTIPLIER}. Must be {@code >= 1}.
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public Builder setMultiplier(double multiplier) {
this.multiplier = multiplier;
return this;
}
/**
* Returns the maximum value of the back off period in milliseconds. Once the current interval
* reaches this value it stops increasing. The default value is
* {@link #DEFAULT_MAX_INTERVAL_MILLIS}. Must be {@code >= initialInterval}.
*/
public final int getMaxIntervalMillis() {
return maxIntervalMillis;
}
/**
* Sets the maximum value of the back off period in milliseconds. Once the current interval
* reaches this value it stops increasing. The default value is
* {@link #DEFAULT_MAX_INTERVAL_MILLIS}.
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public Builder setMaxIntervalMillis(int maxIntervalMillis) {
this.maxIntervalMillis = maxIntervalMillis;
return this;
}
/**
* Returns the maximum elapsed time in milliseconds. The default value is
* {@link #DEFAULT_MAX_ELAPSED_TIME_MILLIS}.
*
* <p>
* If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the
* max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning
* {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}.
* </p>
*/
public final int getMaxElapsedTimeMillis() {
return maxElapsedTimeMillis;
}
/**
* Sets the maximum elapsed time in milliseconds. The default value is
* {@link #DEFAULT_MAX_ELAPSED_TIME_MILLIS}. Must be {@code > 0}.
*
* <p>
* If the time elapsed since an {@link ExponentialBackOff} instance is created goes past the
* max_elapsed_time then the method {@link #nextBackOffMillis()} starts returning
* {@link BackOff#STOP}. The elapsed time can be reset by calling {@link #reset()}.
* </p>
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public Builder setMaxElapsedTimeMillis(int maxElapsedTimeMillis) {
this.maxElapsedTimeMillis = maxElapsedTimeMillis;
return this;
}
/**
* Returns the nano clock.
*/
public final NanoClock getNanoClock() {
return nanoClock;
}
/**
* Sets the nano clock ({@link NanoClock#SYSTEM} by default).
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public Builder setNanoClock(NanoClock nanoClock) {
this.nanoClock = nanoClock; //Preconditions.checkNotNull(nanoClock);
return this;
}
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright (c) 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.xbib.netty.http.client.retry;
/**
* Nano clock which can be used to measure elapsed time in nanoseconds.
*
* <p>
* The default system implementation can be accessed at {@link #SYSTEM}. Alternative implementations
* may be used for testing.
* </p>
*
* @since 1.14
* @author Yaniv Inbar
*/
public interface NanoClock {
/**
* Returns the current value of the most precise available system timer, in nanoseconds for use to
* measure elapsed time, to match the behavior of {@link System#nanoTime()}.
*/
long nanoTime();
/**
* Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
*/
NanoClock SYSTEM = new NanoClock() {
public long nanoTime() {
return System.nanoTime();
}
};
}

View file

@ -16,12 +16,9 @@ import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.RequestBuilder;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.MalformedInputException;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnmappableCharacterException;
@ -51,16 +48,6 @@ abstract class BaseTransport implements Transport {
protected SortedMap<Integer, Request> requests;
protected HttpResponseListener responseListener;
protected ExceptionListener exceptionListener;
protected HttpHeadersListener httpHeadersListener;
protected CookieListener cookieListener;
protected HttpPushListener pushListener;
private Map<Cookie, Boolean> cookieBox;
BaseTransport(Client client, HttpAddress httpAddress) {
@ -75,31 +62,8 @@ abstract class BaseTransport implements Transport {
}
@Override
public void connect() throws InterruptedException {
channel = client.newChannel(httpAddress);
channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
}
@Override
public Channel channel() {
return channel;
}
@Override
public Transport execute(Request request) {
if (channel == null) {
try {
connect();
awaitSettings();
} catch (InterruptedException e) {
return this;
}
}
setResponseListener(request.getResponseListener());
setExceptionListener(request.getExceptionListener());
setHeadersListener(request.getHeadersListener());
setCookieListener(request.getCookieListener());
setPushListener(request.getPushListener());
public Transport execute(Request request) throws IOException {
ensureConnect();
// some HTTP 1.1 servers like Elasticsearch do not understand full URIs in HTTP command line
String uri = request.httpVersion().majorVersion() < 2 ?
request.base().relativeReference() : request.base().toString();
@ -136,91 +100,45 @@ abstract class BaseTransport implements Transport {
*/
@Override
public <T> CompletableFuture<T> execute(Request request,
Function<FullHttpResponse, T> supplier) {
Function<FullHttpResponse, T> supplier) throws IOException {
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
request.setExceptionListener(completableFuture::completeExceptionally);
//request.setExceptionListener(completableFuture::completeExceptionally);
request.setResponseListener(response -> completableFuture.complete(supplier.apply(response)));
execute(request);
return completableFuture;
}
@Override
public void close() {
public synchronized void close() {
get();
if (channel != null) {
channel.close();
channel = null;
}
}
@Override
public void setResponseListener(HttpResponseListener responseListener) {
if (responseListener != null) {
this.responseListener = responseListener;
protected void ensureConnect() throws IOException {
if (channel == null) {
try {
channel = client.newChannel(httpAddress);
channel.attr(TRANSPORT_ATTRIBUTE_KEY).set(this);
awaitSettings();
} catch (InterruptedException e) {
throw new ConnectException("unable to connect to " + httpAddress);
}
}
@Override
public HttpResponseListener getResponseListener() {
return responseListener;
}
@Override
public void setHeadersListener(HttpHeadersListener httpHeadersListener) {
if (httpHeadersListener != null) {
this.httpHeadersListener = httpHeadersListener;
}
}
@Override
public HttpHeadersListener getHeadersListener() {
return httpHeadersListener;
}
@Override
public void setCookieListener(CookieListener cookieListener) {
if (cookieListener != null) {
this.cookieListener = cookieListener;
}
}
@Override
public CookieListener getCookieListener() {
return cookieListener;
}
@Override
public void setExceptionListener(ExceptionListener exceptionListener) {
if (exceptionListener != null) {
this.exceptionListener = exceptionListener;
}
}
@Override
public ExceptionListener getExceptionListener() {
return exceptionListener;
}
@Override
public void setPushListener(HttpPushListener pushListener) {
if (pushListener != null) {
this.pushListener = pushListener;
}
}
@Override
public HttpPushListener getPushListener() {
return pushListener;
}
protected Request continuation(Integer streamId, FullHttpResponse httpResponse) throws URLSyntaxException {
if (httpResponse == null) {
return null;
}
try {
if (streamId == null) {
streamId = requests.lastKey();
Request request = fromStreamId(streamId);
if (request == null) {
// push promise
return null;
}
Request request = requests.get(streamId);
try {
if (request.checkRedirect()) {
int status = httpResponse.status().code();
switch (status) {
@ -238,20 +156,21 @@ abstract class BaseTransport implements Transport {
URL redirUrl = URL.base(request.base()).resolve(location);
HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod();
RequestBuilder newHttpRequestBuilder = Request.builder(method)
.setURL(redirUrl)
.url(redirUrl)
.setVersion(request.httpVersion())
.setHeaders(request.headers())
.setContent(request.content());
.content(request.content());
// TODO(jprante) convencience to copy pathAndQuery from one request to another
request.base().getQueryParams().forEach(pair ->
newHttpRequestBuilder.addParam(pair.getFirst(), pair.getSecond())
newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond())
);
request.cookies().forEach(newHttpRequestBuilder::addCookie);
Request newHttpRequest = newHttpRequestBuilder.build();
newHttpRequest.setResponseListener(request.getResponseListener());
newHttpRequest.setExceptionListener(request.getExceptionListener());
//newHttpRequest.setExceptionListener(request.getExceptionListener());
newHttpRequest.setHeadersListener(request.getHeadersListener());
newHttpRequest.setCookieListener(request.getCookieListener());
newHttpRequest.setPushListener(request.getPushListener());
//newHttpRequest.setPushListener(request.getPushListener());
StringBuilder hostAndPort = new StringBuilder();
hostAndPort.append(redirUrl.getHost());
if (redirUrl.getPort() != null) {
@ -275,6 +194,13 @@ abstract class BaseTransport implements Transport {
return null;
}
protected Request fromStreamId(Integer streamId) {
if (streamId == null) {
streamId = requests.lastKey();
}
return requests.get(streamId);
}
public void setCookieBox(Map<Cookie, Boolean> cookieBox) {
this.cookieBox = cookieBox;
}

View file

@ -6,12 +6,17 @@ import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import org.xbib.net.URLSyntaxException;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
@ -36,11 +41,6 @@ public class Http2Transport extends BaseTransport implements Transport {
super(client, httpAddress);
streamIdCounter = new AtomicInteger(3);
streamidPromiseMap = new ConcurrentSkipListMap<>();
}
@Override
public void connect() throws InterruptedException {
super.connect();
settingsPromise = new CompletableFuture<>();
}
@ -86,34 +86,38 @@ public class Http2Transport extends BaseTransport implements Transport {
}
CompletableFuture<Boolean> promise = streamidPromiseMap.get(streamId);
if (promise == null) {
logger.log(Level.WARNING, "message received for unknown stream id " + streamId);
if (pushListener != null) {
pushListener.onPushReceived(null, fullHttpResponse);
}
logger.log(Level.WARNING, "response received for unknown stream id " + streamId);
} else {
Request request = fromStreamId(streamId);
if (request != null) {
HttpResponseListener responseListener = request.getResponseListener();
if (responseListener != null) {
responseListener.onResponse(fullHttpResponse);
}
// forward?
try {
Request request = continuation(streamId, fullHttpResponse);
request = continuation(streamId, fullHttpResponse);
if (request != null) {
// synchronous call here
client.continuation(this, request);
}
} catch (URLSyntaxException e) {
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
// complete origin transport
}
// complete origin
promise.complete(true);
}
}
@Override
public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
Request request = fromStreamId(streamId);
if (request != null) {
HttpHeadersListener httpHeadersListener = request.getHeadersListener();
if (httpHeadersListener != null) {
httpHeadersListener.onHeaders(httpHeaders);
}
CookieListener cookieListener = request.getCookieListener();
if (cookieListener != null) {
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
@ -121,6 +125,13 @@ public class Http2Transport extends BaseTransport implements Transport {
}
}
}
}
@Override
public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) {
streamidPromiseMap.put(promisedStreamId, new CompletableFuture<>());
requests.put(promisedStreamId, fromStreamId(streamId));
}
@Override
public void awaitResponse(Integer streamId) {
@ -156,9 +167,6 @@ public class Http2Transport extends BaseTransport implements Transport {
@Override
public void fail(Throwable throwable) {
if (exceptionListener != null) {
exceptionListener.onException(throwable);
}
for (CompletableFuture<Boolean> promise : streamidPromiseMap.values()) {
promise.completeExceptionally(throwable);
}

View file

@ -6,12 +6,17 @@ import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import org.xbib.net.URLSyntaxException;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
@ -58,15 +63,19 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override
public void responseReceived(Integer streamId, FullHttpResponse fullHttpResponse) {
Request request = fromStreamId(streamId);
if (request != null) {
HttpResponseListener responseListener = request.getResponseListener();
if (responseListener != null) {
responseListener.onResponse(fullHttpResponse);
}
}
try {
Request request = continuation(null, fullHttpResponse);
request = continuation(null, fullHttpResponse);
if (request != null) {
client.continuation(this, request);
}
} catch (URLSyntaxException e) {
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
if (!sequentialPromiseMap.isEmpty()) {
@ -79,17 +88,26 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override
public void headersReceived(Integer streamId, HttpHeaders httpHeaders) {
Request request = fromStreamId(streamId);
if (request != null) {
HttpHeadersListener httpHeadersListener = request.getHeadersListener();
if (httpHeadersListener != null) {
httpHeadersListener.onHeaders(httpHeaders);
}
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
addCookie(cookie);
CookieListener cookieListener = request.getCookieListener();
if (cookieListener != null) {
cookieListener.onCookie(cookie);
}
}
}
}
@Override
public void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers) {
}
@Override
public void awaitResponse(Integer streamId) {
@ -125,11 +143,9 @@ public class HttpTransport extends BaseTransport implements Transport {
@Override
public void fail(Throwable throwable) {
if (exceptionListener != null) {
exceptionListener.onException(throwable);
}
for (CompletableFuture<Boolean> promise : sequentialPromiseMap.values()) {
promise.completeExceptionally(throwable);
}
}
}

View file

@ -4,16 +4,13 @@ import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.AttributeKey;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@ -24,13 +21,9 @@ public interface Transport {
HttpAddress httpAddress();
void connect() throws InterruptedException;
Transport execute(Request request) throws IOException;
Transport execute(Request request);
<T> CompletableFuture<T> execute(Request request, Function<FullHttpResponse, T> supplier);
Channel channel();
<T> CompletableFuture<T> execute(Request request, Function<FullHttpResponse, T> supplier) throws IOException;
Integer nextStream();
@ -38,26 +31,6 @@ public interface Transport {
void awaitSettings();
void setResponseListener(HttpResponseListener responseListener);
HttpResponseListener getResponseListener();
void setExceptionListener(ExceptionListener exceptionListener);
ExceptionListener getExceptionListener();
void setHeadersListener(HttpHeadersListener headersListener);
HttpHeadersListener getHeadersListener();
void setPushListener(HttpPushListener pushListener);
HttpPushListener getPushListener();
void setCookieListener(CookieListener cookieListener);
CookieListener getCookieListener();
void setCookieBox(Map<Cookie, Boolean> cookieBox);
Map<Cookie, Boolean> getCookieBox();
@ -66,6 +39,8 @@ public interface Transport {
void headersReceived(Integer streamId, HttpHeaders httpHeaders);
void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers);
void awaitResponse(Integer streamId);
Transport get();

View file

@ -1,57 +0,0 @@
package org.xbib.netty.http.client.test;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.test.LoggingBase;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
@Ignore
public class AkamaiTest extends LoggingBase {
private static final Logger logger = Logger.getLogger("");
/**
* h2_demo_frame.html fails with:
* 2018-02-27 23:43:32.048 INFORMATION [client] io.netty.handler.codec.http2.Http2FrameLogger
* logRstStream [id: 0x4fe29f1e, L:/192.168.178.23:49429 - R:http2.akamai.com/104.94.191.203:443]
* INBOUND RST_STREAM: streamId=2 errorCode=8
* 2018-02-27 23:43:32.049 SCHWERWIEGEND [] org.xbib.netty.http.client.test.a.AkamaiTest lambda$testAkamaiHttps$0
* HTTP/2 to HTTP layer caught stream reset
* io.netty.handler.codec.http2.Http2Exception$StreamException: HTTP/2 to HTTP layer caught stream reset
*
* TODO(jprante) catch all promised pushes
*/
@Test
public void testAkamaiHttps() {
Client client = new Client();
try {
Request request = Request.get()
//.setURL("https://http2.akamai.com/demo/h2_demo_frame.html")
.setURL("https://http2.akamai.com/")
.setVersion("HTTP/2.0")
.build()
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status()
+ " response body = " + response);
})
.setPushListener((requestHeaders, fullHttpResponse) -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "received push: request headers = " + requestHeaders
+ " status = " + fullHttpResponse.status()
+ " response headers = " + fullHttpResponse.headers().entries()
+ " response body = " + response
);
});
client.execute(request).get();
} finally {
client.shutdownGracefully();
}
}
}

View file

@ -1,181 +0,0 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.transport.Transport;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ClientTest {
private static final Logger logger = Logger.getLogger(ClientTest.class.getName());
@Test
public void testHttp1() throws Exception {
Client client = new Client();
try {
Transport transport = client.newTransport(HttpAddress.http1("fl.hbz-nrw.de"));
transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
transport.connect();
transport.awaitSettings();
simpleRequest(transport);
transport.get();
transport.close();
} finally {
client.shutdown();
}
}
@Test
public void testHttp1ParallelRequests() {
Client client = new Client();
try {
Request request1 = Request.builder(HttpMethod.GET)
.setURL("http://fl.hbz-nrw.de").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
Request request2 = Request.builder(HttpMethod.GET)
.setURL("http://fl.hbz-nrw.de/app/fl/").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request1);
client.execute(request2);
} finally {
client.shutdownGracefully();
}
}
@Test
public void testHttp2() throws Exception {
String host = "webtide.com";
Client client = new Client();
client.logDiagnostics(Level.INFO);
try {
Transport transport = client.newTransport(HttpAddress.http2(host));
transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
transport.setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
transport.connect();
transport.awaitSettings();
simpleRequest(transport);
transport.get();
transport.close();
} finally {
client.shutdown();
}
}
@Test
public void testHttp2Request() {
//String url = "https://webtide.com";
String url = "https://http2-push.io";
// TODO register push announces into promises in order to wait for them all.
Client client = new Client();
try {
Request request = Request.builder(HttpMethod.GET)
.setURL(url).setVersion("HTTP/2.0")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()))
.setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " +
msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8))
);
client.execute(request).get();
} finally {
client.shutdownGracefully();
}
}
@Test
public void testHttp2TwoRequestsOnSameConnection() {
Client client = new Client();
try {
Request request1 = Request.builder(HttpMethod.GET)
.setURL("https://webtide.com").setVersion("HTTP/2.0")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()))
.setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " +
msg.headers().entries()
//msg.content().toString(StandardCharsets.UTF_8))
));
Request request2 = Request.builder(HttpMethod.GET)
.setURL("https://webtide.com/why-choose-jetty/").setVersion("HTTP/2.0")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()))
.setPushListener((hdrs, msg) -> logger.log(Level.INFO, "got push: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request1).execute(request2);
} finally {
client.shutdownGracefully();
}
}
@Test
public void testTwoTransports() throws Exception {
Client client = Client.builder().enableDebug().build();
try {
Transport transport = client.newTransport(HttpAddress.http1("xbib.org"));
transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.content().toString(StandardCharsets.UTF_8)));
transport.connect();
transport.awaitSettings();
simpleRequest(transport);
transport.get();
transport.close();
transport = client.newTransport(HttpAddress.http2("google.com"));
transport.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.content().toString(StandardCharsets.UTF_8)));
transport.connect();
transport.awaitSettings();
simpleRequest(transport);
transport.get();
transport.close();
} finally {
client.shutdown();
}
}
private void simpleRequest(Transport transport) {
transport.execute(Request.builder(HttpMethod.GET)
.setVersion(transport.httpAddress().getVersion())
.setURL(transport.httpAddress().base()).build());
}
}

View file

@ -5,6 +5,7 @@ import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@ -19,23 +20,28 @@ public class CompletableFutureTest {
* Get some weird content from one URL and post it to another URL, by composing completable futures.
*/
@Test
public void testComposeCompletableFutures() {
public void testComposeCompletableFutures() throws IOException {
Client client = new Client();
try {
final Function<FullHttpResponse, String> httpResponseStringFunction = response ->
response.content().toString(StandardCharsets.UTF_8);
Request request = Request.get()
.setURL("http://alkmene.hbz-nrw.de/repository/org/xbib/content/2.0.0-SNAPSHOT/maven-metadata-local.xml")
.url("http://alkmene.hbz-nrw.de/repository/org/xbib/content/2.0.0-SNAPSHOT/maven-metadata-local.xml")
.build();
CompletableFuture<String> completableFuture = client.execute(request, httpResponseStringFunction)
.exceptionally(Throwable::getMessage)
.thenCompose(content -> {
logger.log(Level.INFO, content);
// POST is not allowed, we don't care
try {
return client.execute(Request.post()
.setURL("http://google.com/")
.addParam("query", content)
.url("http://google.com/")
.addParameter("query", content)
.build(), httpResponseStringFunction);
} catch (IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
return null;
}
});
String result = completableFuture.join();
logger.log(Level.INFO, "completablefuture result = " + result);

View file

@ -5,6 +5,7 @@ import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -14,7 +15,7 @@ public class ConscryptTest extends LoggingBase {
private static final Logger logger = Logger.getLogger("");
@Test
public void testConscrypt() {
public void testConscrypt() throws IOException {
Client client = Client.builder()
.enableDebug()
.setJdkSslProvider()
@ -23,10 +24,9 @@ public class ConscryptTest extends LoggingBase {
logger.log(Level.INFO, client.getClientConfig().toString());
try {
Request request = Request.get()
.setURL("https://fl-test.hbz-nrw.de")
.url("https://fl-test.hbz-nrw.de")
.setVersion("HTTP/2.0")
.build()
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status()

View file

@ -4,15 +4,16 @@ import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*/
public class HttpBinTest extends LoggingBase {
public class CookieSetterHttpBinTest extends LoggingBase {
private static final Logger logger = Logger.getLogger(HttpBinTest.class.getName());
private static final Logger logger = Logger.getLogger(CookieSetterHttpBinTest.class.getName());
/**
* Test httpbin.org "Set-Cookie:" header after redirection of URL.
@ -28,15 +29,14 @@ public class HttpBinTest extends LoggingBase {
* @throws Exception
*/
@Test
public void testHttpBinCookies() {
public void testHttpBinCookies() throws IOException {
Client client = new Client();
try {
Request request = Request.get()
.setURL("http://httpbin.org/cookies/set?name=value")
.url("http://httpbin.org/cookies/set?name=value")
.build()
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.setCookieListener(cookie -> logger.log(Level.INFO, "this is the cookie " + cookie.toString()))
.setHeadersListener(headers -> logger.log(Level.INFO, headers.toString()))
.setCookieListener(cookie -> logger.log(Level.INFO, "this is the cookie: " + cookie.toString()))
.setHeadersListener(headers -> logger.log(Level.INFO, "headers = " + headers.entries().toString()))
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);

View file

@ -6,6 +6,7 @@ import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.transport.Transport;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -21,17 +22,16 @@ public class ElasticsearchTest extends LoggingBase {
private static final Logger logger = Logger.getLogger(ElasticsearchTest.class.getName());
@Test
public void testElasticsearchCreateDocument() {
public void testElasticsearchCreateDocument() throws IOException {
Client client = new Client();
try {
Request request = Request.put().setURL("http://localhost:9200/test/test/1")
Request request = Request.put().url("http://localhost:9200/test/test/1")
.json("{\"text\":\"Hello World\"}")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e));
});
client.execute(request);
} finally {
client.shutdownGracefully();
@ -39,17 +39,16 @@ public class ElasticsearchTest extends LoggingBase {
}
@Test
public void testElasticsearchMatchQuery() {
public void testElasticsearchMatchQuery() throws IOException {
Client client = new Client();
try {
Request request = Request.post().setURL("http://localhost:9200/test/_search")
Request request = Request.post().url("http://localhost:9200/test/_search")
.json("{\"query\":{\"match\":{\"text\":\"Hello World\"}}}")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e));
});
client.execute(request).get();
} finally {
client.shutdownGracefully();
@ -57,7 +56,7 @@ public class ElasticsearchTest extends LoggingBase {
}
@Test
public void testElasticsearchConcurrent() {
public void testElasticsearchConcurrent() throws IOException {
Client client = Client.builder().setReadTimeoutMillis(20000).build();
int max = 1000;
try {
@ -78,15 +77,14 @@ public class ElasticsearchTest extends LoggingBase {
private Request newRequest() {
return Request.post()
.setURL("http://localhost:9200/test/_search")
.url("http://localhost:9200/test/_search")
.json("{\"query\":{\"match\":{\"text\":\"Hello World\"}}}")
.addHeader("connection", "keep-alive")
.build()
.setResponseListener(fullHttpResponse ->
logger.log(Level.FINE, "status = " + fullHttpResponse.status() +
" counter = " + count.incrementAndGet() +
" response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8)))
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e));
" response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8)));
}
private final AtomicInteger count = new AtomicInteger();

View file

@ -0,0 +1,76 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Http1Test {
private static final Logger logger = Logger.getLogger(Http1Test.class.getName());
@Test
public void testHttp1() throws Exception {
Client client = new Client();
try {
Request request = Request.get().url("http://fl.hbz-nrw.de").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request).get();
} finally {
client.shutdown();
}
}
@Test
public void testHttp1ParallelRequests() throws IOException {
Client client = new Client();
try {
Request request1 = Request.builder(HttpMethod.GET)
.url("http://fl.hbz-nrw.de").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
Request request2 = Request.builder(HttpMethod.GET)
.url("http://fl.hbz-nrw.de/app/fl/").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request1);
client.execute(request2);
} finally {
client.shutdownGracefully();
}
}
@Test
public void testTwoTransports() throws Exception {
Client client = Client.builder().enableDebug().build();
try {
Request request1 = Request.get().url("http://xbib.org").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.content().toString(StandardCharsets.UTF_8)));
client.execute(request1).get();
Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.content().toString(StandardCharsets.UTF_8)));
client.execute(request2).get();
} finally {
client.shutdown();
}
}
}

View file

@ -0,0 +1,104 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Http2Test {
private static final Logger logger = Logger.getLogger(Http2Test.class.getName());
/**
*/
@Test
public void testAkamai() throws IOException {
Client client = Client.builder().enableDebug().build();
try {
Request request = Request.get()
.url("https://http2.akamai.com/demo/h2_demo_frame.html")
//.url("https://http2.akamai.com/")
.setVersion("HTTP/2.0")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status()
+ " response body = " + response);
});
client.execute(request).get();
} finally {
client.shutdownGracefully();
}
}
@Test
public void testWebtide() throws Exception {
Client client = Client.builder().enableDebug().build();
client.logDiagnostics(Level.INFO);
try {
Request request = Request.get().url("https://webtide.com").setVersion("HTTP/2.0").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request).get();
} finally {
client.shutdown();
}
}
@Test
public void testHttp2PushIO() throws IOException {
//String url = "https://webtide.com";
String url = "https://http2-push.io";
// TODO register push announces into promises in order to wait for them all.
Client client = Client.builder().enableDebug().build();
try {
Request request = Request.builder(HttpMethod.GET)
.url(url).setVersion("HTTP/2.0")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request).get();
} finally {
client.shutdownGracefully();
}
}
@Test
public void testWebtideTwoRequestsOnSameConnection() {
Client client = new Client();
try {
Request request1 = Request.builder(HttpMethod.GET)
.url("https://webtide.com").setVersion("HTTP/2.0")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
Request request2 = Request.builder(HttpMethod.GET)
.url("https://webtide.com/why-choose-jetty/").setVersion("HTTP/2.0")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request1).execute(request2);
} catch (IOException e) {
//
} finally {
client.shutdownGracefully();
}
}
}

View file

@ -0,0 +1,19 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.Test;
import org.xbib.netty.http.client.Request;
import java.util.logging.Level;
import java.util.logging.Logger;
public class RequestBuilderTest {
private static final Logger logger = Logger.getLogger(RequestBuilderTest.class.getName());
@Test
public void testSimpleRequest() {
Request request = Request.builder(HttpMethod.GET).build();
logger.log(Level.INFO, request.toString());
}
}

View file

@ -25,13 +25,13 @@ public class URITest {
@Test
public void testRequestURIs() {
RequestBuilder httpRequestBuilder = Request.get();
httpRequestBuilder.setURL("https://localhost").path("/path");
httpRequestBuilder.url("https://localhost").uri("/path");
assertEquals("/path", httpRequestBuilder.build().relativeUri());
httpRequestBuilder.path("/foobar");
httpRequestBuilder.uri("/foobar");
assertEquals("/foobar", httpRequestBuilder.build().relativeUri());
httpRequestBuilder.path("/path1?a=b");
httpRequestBuilder.uri("/path1?a=b");
assertEquals("/path1?a=b", httpRequestBuilder.build().relativeUri());
httpRequestBuilder.path("/path2?c=d");
httpRequestBuilder.uri("/path2?c=d");
assertEquals("/path2?c=d", httpRequestBuilder.build().relativeUri());
}
}

View file

@ -5,8 +5,8 @@ import io.netty.handler.proxy.HttpProxyHandler;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.test.LoggingBase;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
@ -19,10 +19,10 @@ public class XbibTest extends LoggingBase {
private static final Logger logger = Logger.getLogger("");
@Test
public void testXbibOrgWithDefaults() {
public void testXbibOrgWithDefaults() throws IOException {
Client client = new Client();
try {
Request request = Request.get().setURL("http://xbib.org")
Request request = Request.get().url("http://xbib.org")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
@ -35,21 +35,28 @@ public class XbibTest extends LoggingBase {
}
@Test
public void testXbibOrgWithCompletableFuture() {
public void testXbibOrgWithCompletableFuture() throws IOException {
Client httpClient = Client.builder()
.setTcpNodelay(true)
.build();
try {
final Function<FullHttpResponse, String> httpResponseStringFunction =
response -> response.content().toString(StandardCharsets.UTF_8);
Request request = Request.get().setURL("http://xbib.org")
Request request = Request.get().url("http://xbib.org")
.build();
final CompletableFuture<String> completableFuture = httpClient.execute(request, httpResponseStringFunction)
.exceptionally(Throwable::getMessage)
.thenCompose(content -> httpClient.execute(Request.post()
.setURL("http://google.de")
.addParam("query", content)
.build(), httpResponseStringFunction));
.thenCompose(content -> {
try {
return httpClient.execute(Request.post()
.url("http://google.de")
.addParameter("query", content)
.build(), httpResponseStringFunction);
} catch (IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
return null;
}
});
String result = completableFuture.join();
logger.info("result = " + result);
} finally {
@ -58,7 +65,7 @@ public class XbibTest extends LoggingBase {
}
@Test
public void testXbibOrgWithProxy() {
public void testXbibOrgWithProxy() throws IOException {
Client httpClient = Client.builder()
.setHttpProxyHandler(new HttpProxyHandler(new InetSocketAddress("80.241.223.251", 8080)))
.setConnectTimeoutMillis(30000)
@ -66,13 +73,12 @@ public class XbibTest extends LoggingBase {
.build();
try {
httpClient.execute(Request.get()
.setURL("http://xbib.org")
.url("http://xbib.org")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)))
}))
.get();
} finally {
httpClient.shutdownGracefully();
@ -80,19 +86,18 @@ public class XbibTest extends LoggingBase {
}
@Test
public void testXbibOrgWithVeryShortReadTimeout() {
public void testXbibOrgWithVeryShortReadTimeout() throws IOException {
Client httpClient = Client.builder()
.setReadTimeoutMillis(50)
.build();
try {
httpClient.execute(Request.get()
.setURL("http://xbib.org")
.url("http://xbib.org")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e)))
}))
.get();
} finally {
httpClient.shutdownGracefully();
@ -100,14 +105,13 @@ public class XbibTest extends LoggingBase {
}
@Test
public void testXbibTwoSequentialRequests() {
public void testXbibTwoSequentialRequests() throws IOException {
Client httpClient = new Client();
try {
httpClient.execute(Request.get()
.setVersion("HTTP/1.1")
.setURL("http://xbib.org")
.url("http://xbib.org")
.build()
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -116,9 +120,8 @@ public class XbibTest extends LoggingBase {
httpClient.execute(Request.get()
.setVersion("HTTP/1.1")
.setURL("http://xbib.org")
.url("http://xbib.org")
.build()
.setExceptionListener(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);

View file

@ -0,0 +1,132 @@
package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.pool.SimpleChannelPool;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;
public class EpollTest {
private static final Logger logger = Logger.getLogger(EpollTest.class.getName());
private static final int CONCURRENCY = 10;
private static final List<HttpAddress> NODES =
Collections.singletonList(HttpAddress.http1("localhost", 12345));
private static final long TEST_TIME_SECONDS = 100;
private static final int ATTEMPTS = 10_000;
private static final int FAIL_EVERY_ATTEMPT = 10;
private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000);
private MockEpollServer mockEpollServer;
private Pool<Channel> channelPool;
private EventLoopGroup eventLoopGroup;
@Before
public void setUp() throws Exception {
mockEpollServer = new MockEpollServer(12345, FAIL_EVERY_ATTEMPT);
Semaphore semaphore = new Semaphore(CONCURRENCY);
eventLoopGroup = new EpollEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(EpollSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new DummyClientChannelHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0);
channelPool.prepare(CONCURRENCY);
}
@After
public void tearDown() throws Exception {
channelPool.close();
eventLoopGroup.shutdownGracefully();
mockEpollServer.close();
}
@Ignore
@Test
public void testPoolEpoll() throws Exception {
LongAdder longAdder = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY);
for(int i = 0; i < CONCURRENCY; i ++) {
executor.submit(() -> {
Channel channel;
for(int j = 0; j < ATTEMPTS; j ++) {
try {
while ((channel = channelPool.acquire()) == null) {
Thread.sleep(1); // very short?
}
channel.writeAndFlush(PAYLOAD.retain()).sync();
channelPool.release(channel);
longAdder.increment();
} catch (InterruptedException e) {
break;
} catch (Throwable cause) {
logger.log(Level.WARNING, cause.getMessage(), cause);
}
}
});
}
executor.shutdown();
executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS);
assertTrue(executor.isTerminated());
assertEquals(CONCURRENCY * ATTEMPTS, longAdder.sum(),
2 * CONCURRENCY * ATTEMPTS / FAIL_EVERY_ATTEMPT);
}
class DummyClientChannelHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
logger.log(Level.WARNING, cause.getMessage(), cause);
}
}
}

View file

@ -0,0 +1,64 @@
package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MockEpollServer implements Closeable {
private static final Logger logger = Logger.getLogger(MockEpollServer.class.getName());
private final EventLoopGroup dispatchGroup;
private final EventLoopGroup workerGroup;
private final ChannelFuture bindFuture;
private final AtomicLong reqCounter;
public MockEpollServer(int port, int dropEveryRequest) throws InterruptedException {
dispatchGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup();
reqCounter = new AtomicLong(0);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(dispatchGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
if (dropEveryRequest > 0) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (reqCounter.incrementAndGet() % dropEveryRequest == 0) {
Channel channel = ctx.channel();
logger.log(Level.INFO,"dropping the connection " + channel);
channel.close();
}
}
});
}
}
});
bindFuture = bootstrap.bind(port).sync();
}
@Override
public void close() {
bindFuture.channel().close();
workerGroup.shutdownGracefully();
dispatchGroup.shutdownGracefully();
}
}

View file

@ -0,0 +1,62 @@
package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MockNioServer implements Closeable {
private static final Logger logger = Logger.getLogger(MockNioServer.class.getName());
private final EventLoopGroup dispatchGroup;
private final EventLoopGroup workerGroup;
private final ChannelFuture bindFuture;
private final AtomicLong reqCounter;
public MockNioServer(int port, int dropEveryRequest) throws InterruptedException {
dispatchGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
reqCounter = new AtomicLong(0);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(dispatchGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (reqCounter.incrementAndGet() % dropEveryRequest == 0) {
Channel channel = ctx.channel();
logger.log(Level.INFO, "dropping the connection " + channel);
channel.close();
}
}
});
}
});
bindFuture = bootstrap.bind(port).sync();
}
@Override
public void close() {
bindFuture.channel().close();
workerGroup.shutdownGracefully();
dispatchGroup.shutdownGracefully();
}
}

View file

@ -0,0 +1,128 @@
package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.pool.SimpleChannelPool;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class NioTest {
private static final Logger logger = Logger.getLogger(NioTest.class.getName());
private static final int CONCURRENCY = 10;
private static final List<HttpAddress> NODES =
Collections.singletonList(HttpAddress.http1("localhost", 12345));
private static final long TEST_TIME_SECONDS = 100;
private static final int ATTEMPTS = 10_000;
private static final int FAIL_EVERY_ATTEMPT = 10;
private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000);
private MockNioServer mockNioServer;
private Pool<Channel> channelPool;
private EventLoopGroup eventLoopGroup;
@Before
public void setUp() throws Exception {
mockNioServer = new MockNioServer(12345, FAIL_EVERY_ATTEMPT);
Semaphore semaphore = new Semaphore(CONCURRENCY);
eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new DummyClientChannelHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
channelPool = new SimpleChannelPool<>(semaphore, NODES, bootstrap, null, 0);
channelPool.prepare(CONCURRENCY);
}
@After
public void tearDown() throws Exception {
channelPool.close();
eventLoopGroup.shutdownGracefully();
mockNioServer.close();
}
@Test
public void testPoolNio() throws Exception {
LongAdder longAdder = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY);
for(int i = 0; i < CONCURRENCY; i ++) {
executor.submit(() -> {
Channel channel;
for(int j = 0; j < ATTEMPTS; j ++) {
try {
while ((channel = channelPool.acquire()) == null) {
Thread.sleep(1);
}
channel.writeAndFlush(PAYLOAD.retain()).sync();
channelPool.release(channel);
longAdder.increment();
} catch (InterruptedException e) {
break;
} catch (Throwable cause) {
logger.log(Level.WARNING, cause.getMessage(), cause);
}
}
});
}
executor.shutdown();
executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS);
assertTrue(executor.isTerminated());
assertEquals(CONCURRENCY * ATTEMPTS, longAdder.sum(),
2 * CONCURRENCY * ATTEMPTS / FAIL_EVERY_ATTEMPT);
}
class DummyClientChannelHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
logger.log(Level.WARNING, cause.getMessage(), cause);
}
}
}

View file

@ -0,0 +1,124 @@
package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.util.AttributeKey;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.pool.SimpleChannelPool;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class SimplePoolTest {
private static final Logger logger = Logger.getLogger(SimplePoolTest.class.getName());
private static final int TEST_STEP_TIME_SECONDS = 50;
private static final int BATCH_SIZE = 0x1000;
private int nodeCount;
private ConcurrentMap<HttpAddress, LongAdder> nodeFreq = new ConcurrentHashMap<>();
@Parameterized.Parameters
public static Collection<Object[]> generateData() {
return Arrays.asList(new Object[][] {
{1, 1},
{10, 1}, {10, 2}, {10, 5}, {10, 10},
{100, 1}, {100, 2}, {100, 5}, {100, 10},
{1000, 1}, {1000, 2}, {1000, 5}, {1000, 10}
});
}
public SimplePoolTest(int concurrencyLevel, int nodeCount) {
this.nodeCount = nodeCount;
List<HttpAddress> nodes = new ArrayList<>();
for (int i = 0; i < nodeCount; i ++) {
nodes.add(HttpAddress.http1("localhost" + i));
}
try (Pool<Channel> pool = new SimpleChannelPool<>(new Semaphore(concurrencyLevel), nodes, new Bootstrap(),
null, 0)) {
int n = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(n);
for(int i = 0; i < n; i ++) {
executorService.submit(() -> {
Thread currThread = Thread.currentThread();
List<Channel> channels = new ArrayList<>(BATCH_SIZE);
int j;
int k;
Channel channel;
try {
while (!currThread.isInterrupted()) {
for (j = 0; j < BATCH_SIZE; j ++) {
channel = pool.acquire();
if (channel == null) {
break;
}
AttributeKey<HttpAddress> attributeKey = AttributeKey.valueOf("poolKey");
nodeFreq.computeIfAbsent(channel.attr(attributeKey).get(), node -> new LongAdder()).increment();
channels.add(channel);
}
for (k = 0; k < j; k ++) {
pool.release(channels.get(k));
}
channels.clear();
}
} catch (Exception ignored) {
//
}
});
}
executorService.shutdown();
try {
executorService.awaitTermination(TEST_STEP_TIME_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
executorService.shutdownNow();
} catch (Throwable t) {
logger.log(Level.WARNING, t.getMessage(), t);
} finally {
long connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum();
logger.log(Level.INFO, "concurrency = " + concurrencyLevel + ", nodes = " + nodeCount + " -> rate: " +
connCountSum / TEST_STEP_TIME_SECONDS);
}
}
@Test
public void testNodeFrequency() {
if (nodeCount > 1) {
long connCountSum = nodeFreq.values().stream().mapToLong(LongAdder::sum).sum();
long avgConnCountPerNode = connCountSum / nodeCount;
for (HttpAddress nodeAddr: nodeFreq.keySet()) {
assertTrue(nodeFreq.get(nodeAddr).sum() > 0);
assertEquals("Node count: " + nodeCount + ", node: " + nodeAddr
+ ", expected connection count: " + avgConnCountPerNode + ", actual: "
+ nodeFreq.get(nodeAddr).sum(),
avgConnCountPerNode, nodeFreq.get(nodeAddr).sum(), 1.5 * avgConnCountPerNode);
}
} else {
assertTrue(true);
}
}
}

View file

@ -0,0 +1,156 @@
package org.xbib.netty.http.client.test.retry;
import org.junit.Test;
import org.xbib.netty.http.client.retry.BackOff;
import org.xbib.netty.http.client.retry.ExponentialBackOff;
import org.xbib.netty.http.client.retry.NanoClock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests {@link ExponentialBackOff}.
*/
public class ExponentialBackOffTest {
@Test
public void testConstructor() {
ExponentialBackOff backOffPolicy = new ExponentialBackOff();
assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS,
backOffPolicy.getInitialIntervalMillis());
assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS,
backOffPolicy.getCurrentIntervalMillis());
assertEquals(ExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR,
backOffPolicy.getRandomizationFactor(), 1);
assertEquals(ExponentialBackOff.DEFAULT_MULTIPLIER, backOffPolicy.getMultiplier(), 1);
assertEquals(
ExponentialBackOff.DEFAULT_MAX_INTERVAL_MILLIS, backOffPolicy.getMaxIntervalMillis());
assertEquals(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS,
backOffPolicy.getMaxElapsedTimeMillis());
}
@Test
public void testBuilder() {
ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder().build();
assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS,
backOffPolicy.getInitialIntervalMillis());
assertEquals(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL_MILLIS,
backOffPolicy.getCurrentIntervalMillis());
assertEquals(ExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR,
backOffPolicy.getRandomizationFactor(), 1);
assertEquals(ExponentialBackOff.DEFAULT_MULTIPLIER, backOffPolicy.getMultiplier(), 1);
assertEquals(ExponentialBackOff.DEFAULT_MAX_INTERVAL_MILLIS, backOffPolicy.getMaxIntervalMillis());
assertEquals(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS,
backOffPolicy.getMaxElapsedTimeMillis());
int testInitialInterval = 1;
double testRandomizationFactor = 0.1;
double testMultiplier = 5.0;
int testMaxInterval = 10;
int testMaxElapsedTime = 900000;
backOffPolicy = new ExponentialBackOff.Builder()
.setInitialIntervalMillis(testInitialInterval)
.setRandomizationFactor(testRandomizationFactor)
.setMultiplier(testMultiplier)
.setMaxIntervalMillis(testMaxInterval)
.setMaxElapsedTimeMillis(testMaxElapsedTime)
.build();
assertEquals(testInitialInterval, backOffPolicy.getInitialIntervalMillis());
assertEquals(testInitialInterval, backOffPolicy.getCurrentIntervalMillis());
assertEquals(testRandomizationFactor, backOffPolicy.getRandomizationFactor(), 1);
assertEquals(testMultiplier, backOffPolicy.getMultiplier(), 1);
assertEquals(testMaxInterval, backOffPolicy.getMaxIntervalMillis());
assertEquals(testMaxElapsedTime, backOffPolicy.getMaxElapsedTimeMillis());
}
@Test
public void testBackOff() {
int testInitialInterval = 500;
double testRandomizationFactor = 0.1;
double testMultiplier = 2.0;
int testMaxInterval = 5000;
int testMaxElapsedTime = 900000;
ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder()
.setInitialIntervalMillis(testInitialInterval)
.setRandomizationFactor(testRandomizationFactor)
.setMultiplier(testMultiplier)
.setMaxIntervalMillis(testMaxInterval)
.setMaxElapsedTimeMillis(testMaxElapsedTime)
.build();
int[] expectedResults = {500, 1000, 2000, 4000, 5000, 5000, 5000, 5000, 5000, 5000};
for (int expected : expectedResults) {
assertEquals(expected, backOffPolicy.getCurrentIntervalMillis());
// Assert that the next back off falls in the expected range.
int minInterval = (int) (expected - (testRandomizationFactor * expected));
int maxInterval = (int) (expected + (testRandomizationFactor * expected));
long actualInterval = backOffPolicy.nextBackOffMillis();
assertTrue(minInterval <= actualInterval && actualInterval <= maxInterval);
}
}
@Test
public void testGetRandomizedInterval() {
// 33% chance of being 1.
assertEquals(1, ExponentialBackOff.getRandomValueFromInterval(0.5, 0, 2));
assertEquals(1, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.33, 2));
// 33% chance of being 2.
assertEquals(2, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.34, 2));
assertEquals(2, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.66, 2));
// 33% chance of being 3.
assertEquals(3, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.67, 2));
assertEquals(3, ExponentialBackOff.getRandomValueFromInterval(0.5, 0.99, 2));
}
@Test
public void testGetElapsedTimeMillis() {
ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder().setNanoClock(new MyNanoClock()).build();
long elapsedTimeMillis = backOffPolicy.getElapsedTimeMillis();
assertEquals("elapsedTimeMillis=" + elapsedTimeMillis, 1000, elapsedTimeMillis);
}
@Test
public void testMaxElapsedTime() {
ExponentialBackOff backOffPolicy =
new ExponentialBackOff.Builder().setNanoClock(new MyNanoClock(10000)).build();
assertTrue(backOffPolicy.nextBackOffMillis() != BackOff.STOP);
// Change the currentElapsedTimeMillis to be 0 ensuring that the elapsed time will be greater
// than the max elapsed time.
backOffPolicy.setStartTimeNanos(0);
assertEquals(BackOff.STOP, backOffPolicy.nextBackOffMillis());
}
@Test
public void testBackOffOverflow() {
int testInitialInterval = Integer.MAX_VALUE / 2;
double testMultiplier = 2.1;
int testMaxInterval = Integer.MAX_VALUE;
ExponentialBackOff backOffPolicy = new ExponentialBackOff.Builder()
.setInitialIntervalMillis(testInitialInterval)
.setMultiplier(testMultiplier)
.setMaxIntervalMillis(testMaxInterval)
.build();
backOffPolicy.nextBackOffMillis();
// Assert that when an overflow is possible the current interval is set to the max interval.
assertEquals(testMaxInterval, backOffPolicy.getCurrentIntervalMillis());
}
static class MyNanoClock implements NanoClock {
private int i = 0;
private long startSeconds;
MyNanoClock() {
}
MyNanoClock(long startSeconds) {
this.startSeconds = startSeconds;
}
public long nanoTime() {
return (startSeconds + i++) * 1000000000;
}
}
}

View file

@ -0,0 +1,75 @@
package org.xbib.netty.http.client.test.retry;
import org.xbib.netty.http.client.retry.BackOff;
import java.io.IOException;
/**
* Mock for {@link BackOff} that always returns a fixed number.
*
* <p>
* Implementation is not thread-safe.
* </p>
*
*/
public class MockBackOff implements BackOff {
/** Fixed back-off milliseconds. */
private long backOffMillis;
/** Maximum number of tries before returning {@link #STOP}. */
private int maxTries = 10;
/** Number of tries so far. */
private int numTries;
public void reset() throws IOException {
numTries = 0;
}
public long nextBackOffMillis() throws IOException {
if (numTries >= maxTries || backOffMillis == STOP) {
return STOP;
}
numTries++;
return backOffMillis;
}
/**
* Sets the fixed back-off milliseconds (defaults to {@code 0}).
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public MockBackOff setBackOffMillis(long backOffMillis) {
//Preconditions.checkArgument(backOffMillis == STOP || backOffMillis >= 0);
this.backOffMillis = backOffMillis;
return this;
}
/**
* Sets the maximum number of tries before returning {@link #STOP} (defaults to {@code 10}).
*
* <p>
* Overriding is only supported for the purpose of calling the super implementation and changing
* the return type, but nothing else.
* </p>
*/
public MockBackOff setMaxTries(int maxTries) {
//Preconditions.checkArgument(maxTries >= 0);
this.maxTries = maxTries;
return this;
}
/** Returns the maximum number of tries before returning {@link #STOP}. */
public final int getMaxTries() {
return numTries;
}
/** Returns the number of tries so far. */
public final int getNumberOfTries() {
return numTries;
}
}

View file

@ -0,0 +1,27 @@
package org.xbib.netty.http.client.test.retry;
import org.junit.Test;
import org.xbib.netty.http.client.retry.BackOff;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
/**
* Tests {@link MockBackOff}.
*/
public class MockBackOffTest {
@Test
public void testNextBackOffMillis() throws IOException {
subtestNextBackOffMillis(0, new MockBackOff());
subtestNextBackOffMillis(BackOff.STOP, new MockBackOff().setBackOffMillis(BackOff.STOP));
subtestNextBackOffMillis(42, new MockBackOff().setBackOffMillis(42));
}
private void subtestNextBackOffMillis(long expectedValue, BackOff backOffPolicy) throws IOException {
for (int i = 0; i < 10; i++) {
assertEquals(expectedValue, backOffPolicy.nextBackOffMillis());
}
}
}