back to 4.1.16, fix SSL, fix pool, fix tests

This commit is contained in:
Jörg Prante 2018-03-09 00:37:08 +01:00
parent 5cadb716bc
commit 12a5f6418c
23 changed files with 343 additions and 215 deletions

View file

@ -67,7 +67,7 @@ test {
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
}
testLogging {
showStandardStreams = true
showStandardStreams = false
exceptionFormat = 'full'
}
}

View file

@ -1,8 +1,8 @@
group = org.xbib
name = netty-http-client
version = 4.1.22.2
version = 4.1.16.1
netty.version = 4.1.22.Final
netty.version = 4.1.16.Final
tcnative.version = 2.0.7.Final
conscrypt.version = 1.0.1
xbib-net-url.version = 1.1.0

View file

@ -40,6 +40,7 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManagerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.KeyStoreException;
import java.util.ArrayList;
import java.util.Collections;
@ -136,7 +137,7 @@ public final class Client {
this.http2SettingsHandler = new Http2SettingsHandler();
this.http2ResponseHandler = new Http2ResponseHandler();
this.transports = new CopyOnWriteArrayList<>();
if (hasPooledConnections()) {
if (!clientConfig.getPoolNodes().isEmpty()) {
List<HttpAddress> nodes = clientConfig.getPoolNodes();
Integer limit = clientConfig.getPoolNodeConnectionLimit();
if (limit == null || limit < 1) {
@ -183,7 +184,7 @@ public final class Client {
}
public boolean hasPooledConnections() {
return !clientConfig.getPoolNodes().isEmpty();
return pool != null && !clientConfig.getPoolNodes().isEmpty();
}
public BoundedChannelPool<HttpAddress> getPool() {
@ -222,13 +223,13 @@ public final class Client {
} else {
transport = new Http2Transport(this, null);
}
} else {
throw new IllegalStateException();
}
if (transport != null) {
if (transportListener != null) {
transportListener.onOpen(transport);
}
transports.add(transport);
if (transportListener != null) {
transportListener.onOpen(transport);
}
transports.add(transport);
return transport;
}
@ -270,15 +271,15 @@ public final class Client {
}
public void releaseChannel(Channel channel) throws IOException{
if (hasPooledConnections()) {
try {
pool.release(channel);
} catch (Exception e) {
throw new IOException(e);
}
} else {
if (channel != null) {
channel.close();
if (channel != null) {
if (hasPooledConnections()) {
try {
pool.release(channel);
} catch (Exception e) {
throw new IOException(e);
}
} else {
channel.close();
}
}
}
@ -378,14 +379,16 @@ public final class Client {
private static SslHandler newSslHandler(ClientConfig clientConfig, ByteBufAllocator allocator, HttpAddress httpAddress) {
try {
SslContext sslContext = newSslContext(clientConfig);
SslHandler sslHandler = sslContext.newHandler(allocator);
SslContext sslContext = newSslContext(clientConfig, httpAddress.getVersion());
InetSocketAddress peer = httpAddress.getInetSocketAddress();
SslHandler sslHandler = sslContext.newHandler(allocator, peer.getHostName(), peer.getPort());
SSLEngine engine = sslHandler.engine();
List<String> serverNames = clientConfig.getServerNamesForIdentification();
if (serverNames.isEmpty()) {
serverNames = Collections.singletonList(httpAddress.getInetSocketAddress().getHostName());
serverNames = Collections.singletonList(peer.getHostName());
}
SSLParameters params = engine.getSSLParameters();
// use sslContext.newHandler(allocator, peerHost, peerPort) when using params.setEndpointIdentificationAlgorithm
params.setEndpointIdentificationAlgorithm("HTTPS");
List<SNIServerName> sniServerNames = new ArrayList<>();
for (String serverName : serverNames) {
@ -409,11 +412,11 @@ public final class Client {
}
}
private static SslContext newSslContext(ClientConfig clientConfig) throws SSLException {
private static SslContext newSslContext(ClientConfig clientConfig, HttpVersion httpVersion) throws SSLException {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.sslProvider(clientConfig.getSslProvider())
.ciphers(Http2SecurityUtil.CIPHERS, clientConfig.getCipherSuiteFilter())
.applicationProtocolConfig(newApplicationProtocolConfig());
.applicationProtocolConfig(newApplicationProtocolConfig(httpVersion));
if (clientConfig.getSslContextProvider() != null) {
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
}
@ -423,11 +426,16 @@ public final class Client {
return sslContextBuilder.build();
}
private static ApplicationProtocolConfig newApplicationProtocolConfig() {
return new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2);
private static ApplicationProtocolConfig newApplicationProtocolConfig(HttpVersion httpVersion) {
return httpVersion.majorVersion() == 1 ?
new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_1_1) :
new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2);
}
public interface TransportListener {

View file

@ -192,6 +192,8 @@ public class ClientBuilder {
public ClientBuilder addPoolNode(HttpAddress httpAddress) {
clientConfig.addPoolNode(httpAddress);
clientConfig.setPoolVersion(httpAddress.getVersion());
clientConfig.setPoolSecure(httpAddress.isSecure());
return this;
}
@ -205,16 +207,6 @@ public class ClientBuilder {
return this;
}
public ClientBuilder setPoolVersion(HttpVersion poolVersion) {
clientConfig.setPoolVersion(poolVersion);
return this;
}
public ClientBuilder setPoolSecure(boolean poolSecure) {
clientConfig.setPoolSecure(poolSecure);
return this;
}
public ClientBuilder addServerNameForIdentification(String serverName) {
clientConfig.addServerNameForIdentification(serverName);
return this;

View file

@ -56,7 +56,7 @@ public class HttpAddress implements PoolKey {
}
public static HttpAddress of(Request request) {
return new HttpAddress(request.base(), request.httpVersion());
return new HttpAddress(request.url(), request.httpVersion());
}
public static HttpAddress of(URL url, HttpVersion httpVersion) {
@ -76,7 +76,7 @@ public class HttpAddress implements PoolKey {
public InetSocketAddress getInetSocketAddress() {
if (inetSocketAddress == null) {
// this may execute DNS
// this may execute DNS lookup
this.inetSocketAddress = new InetSocketAddress(host, port);
}
return inetSocketAddress;

View file

@ -1,6 +1,8 @@
package org.xbib.netty.http.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
@ -12,8 +14,6 @@ import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import org.xbib.netty.http.client.retry.BackOff;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@ -21,9 +21,9 @@ import java.util.concurrent.CompletableFuture;
/**
* HTTP client request.
*/
public class Request implements Closeable {
public class Request {
private final URL base;
private final URL url;
private final HttpVersion httpVersion;
@ -62,7 +62,7 @@ public class Request implements Closeable {
String uri, ByteBuf content,
long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount,
boolean isBackOff, BackOff backOff) {
this.base = url;
this.url = url;
this.httpVersion = httpVersion;
this.httpMethod = httpMethod;
this.headers = headers;
@ -77,8 +77,8 @@ public class Request implements Closeable {
this.backOff = backOff;
}
public URL base() {
return base;
public URL url() {
return url;
}
public HttpVersion httpVersion() {
@ -139,12 +139,14 @@ public class Request implements Closeable {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Request[base='").append(base)
sb.append("Request[url='").append(url)
.append("',version=").append(httpVersion)
.append(",method=").append(httpMethod)
.append(",uri=").append(uri)
.append(",headers=").append(headers.entries())
.append(",content=").append(content != null ? content.copy(0,16).toString(StandardCharsets.UTF_8) : "")
.append(",content=").append(content != null && content.readableBytes() >= 16 ?
content.copy(0,16).toString(StandardCharsets.UTF_8) + "..." :
content != null ? content.toString(StandardCharsets.UTF_8) : "")
.append("]");
return sb.toString();
}
@ -222,13 +224,10 @@ public class Request implements Closeable {
}
public static RequestBuilder builder(HttpMethod httpMethod) {
return new RequestBuilder().setMethod(httpMethod);
return new RequestBuilder(PooledByteBufAllocator.DEFAULT).setMethod(httpMethod);
}
@Override
public void close() throws IOException {
if (content != null) {
content.release();
}
public static RequestBuilder builder(ByteBufAllocator allocator, HttpMethod httpMethod) {
return new RequestBuilder(allocator).setMethod(httpMethod);
}
}

View file

@ -1,7 +1,11 @@
package org.xbib.netty.http.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
@ -51,6 +55,8 @@ public class RequestBuilder {
private static final HttpVersion HTTP_2_0 = HttpVersion.valueOf("HTTP/2.0");
private final ByteBufAllocator allocator;
private final List<String> removeHeaders;
private final Collection<Cookie> cookies;
@ -85,7 +91,8 @@ public class RequestBuilder {
private BackOff backOff;
RequestBuilder() {
RequestBuilder(ByteBufAllocator allocator) {
this.allocator = allocator;
httpMethod = DEFAULT_METHOD;
httpVersion = DEFAULT_HTTP_VERSION;
userAgent = DEFAULT_USER_AGENT;
@ -341,11 +348,11 @@ public class RequestBuilder {
}
private void content(CharSequence charSequence, AsciiString contentType) {
content(charSequence.toString().getBytes(StandardCharsets.UTF_8), contentType);
content(ByteBufUtil.writeUtf8(allocator, charSequence), contentType);
}
private void content(byte[] buf, AsciiString contentType) {
content(PooledByteBufAllocator.DEFAULT.buffer(buf.length).writeBytes(buf), contentType);
content(allocator.buffer().writeBytes(buf), contentType);
}
private void content(ByteBuf body, AsciiString contentType) {

View file

@ -18,6 +18,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
@ -85,10 +86,10 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
}
this.numberOfNodes = nodes.size();
bootstraps = new HashMap<>(numberOfNodes);
channels = new HashMap<>(numberOfNodes);
availableChannels = new HashMap<>(numberOfNodes);
counts = new HashMap<>(numberOfNodes);
failedCounts = new HashMap<>(numberOfNodes);
channels = new ConcurrentHashMap<>(numberOfNodes);
availableChannels = new ConcurrentHashMap<>(numberOfNodes);
counts = new ConcurrentHashMap<>(numberOfNodes);
failedCounts = new ConcurrentHashMap<>(numberOfNodes);
for (K node : nodes) {
ChannelPoolInitializer initializer = new ChannelPoolInitializer(node, channelPoolHandler);
bootstraps.put(node, bootstrap.clone().remoteAddress(node.getInetSocketAddress())
@ -247,6 +248,9 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
int i = ThreadLocalRandom.current().nextInt(numberOfNodes);
for (int j = i; j < numberOfNodes; j ++) {
nextKey = nodes.get(j % numberOfNodes);
if (counts == null) {
throw new ConnectException("strange");
}
next = counts.get(nextKey);
if (next == 0) {
key = nextKey;
@ -293,9 +297,7 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
channel.closeFuture().addListener(new CloseChannelListener(key, channel));
channel.attr(attributeKey).set(key);
channels.computeIfAbsent(key, node -> new ArrayList<>()).add(channel);
synchronized (counts) {
counts.put(key, counts.get(key) + 1);
}
counts.put(key, counts.get(key) + 1);
if (retriesPerNode > 0) {
failedCounts.put(key, 0);
}
@ -343,16 +345,12 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
logger.log(Level.FINE,"connection to " + key + " closed");
lock.lock();
try {
synchronized (counts) {
if (counts.containsKey(key)) {
counts.put(key, counts.get(key) - 1);
}
if (counts.containsKey(key)) {
counts.put(key, counts.get(key) - 1);
}
synchronized (channels) {
List<Channel> channels = BoundedChannelPool.this.channels.get(key);
if (channels != null) {
channels.remove(channel);
}
List<Channel> channels = BoundedChannelPool.this.channels.get(key);
if (channels != null) {
channels.remove(channel);
}
semaphore.release();
} finally {

View file

@ -74,38 +74,34 @@ abstract class BaseTransport implements Transport {
// Our algorithm is: use always "origin form" for HTTP 1, use absolute form for HTTP 2.
// The reason is that Netty derives the HTTP/2 scheme header from the absolute form.
String uri = request.httpVersion().majorVersion() == 1 ?
request.base().relativeReference() : request.base().toString();
request.url().relativeReference() : request.url().toString();
FullHttpRequest fullHttpRequest = request.content() == null ?
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) :
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri,
request.content());
try {
Integer streamId = nextStream();
if (streamId != null && streamId > 0) {
request.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
} else {
if (request.httpVersion().majorVersion() == 2) {
logger.log(Level.WARNING, "no streamId but HTTP/2 request. Strange!!! " + getClass().getName());
}
Integer streamId = nextStream();
if (streamId != null && streamId > 0) {
request.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), Integer.toString(streamId));
} else {
if (request.httpVersion().majorVersion() == 2) {
logger.log(Level.WARNING, "no streamId but HTTP/2 request. Strange!!! " + getClass().getName());
}
// add matching cookies from box (previous requests) and new cookies from request builder
Collection<Cookie> cookies = new ArrayList<>();
cookies.addAll(matchCookiesFromBox(request));
cookies.addAll(matchCookies(request));
if (!cookies.isEmpty()) {
request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies));
}
// add stream-id and cookie headers
fullHttpRequest.headers().set(request.headers());
if (streamId != null) {
requests.put(streamId, request);
}
if (channel.isWritable()) {
channel.writeAndFlush(fullHttpRequest);
}
// add matching cookies from box (previous requests) and new cookies from request builder
Collection<Cookie> cookies = new ArrayList<>();
cookies.addAll(matchCookiesFromBox(request));
cookies.addAll(matchCookies(request));
if (!cookies.isEmpty()) {
request.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies));
}
// add stream-id and cookie headers
fullHttpRequest.headers().set(request.headers());
if (streamId != null) {
requests.put(streamId, request);
}
if (channel.isWritable()) {
channel.writeAndFlush(fullHttpRequest);
}
} finally {
request.close();
}
return this;
}
@ -214,14 +210,14 @@ abstract class BaseTransport implements Transport {
location = new PercentDecoder(StandardCharsets.UTF_8.newDecoder()).decode(location);
if (location != null) {
logger.log(Level.FINE, "found redirect location: " + location);
URL redirUrl = URL.base(request.base()).resolve(location);
URL redirUrl = URL.base(request.url()).resolve(location);
HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET : request.httpMethod();
RequestBuilder newHttpRequestBuilder = Request.builder(method)
.url(redirUrl)
.setVersion(request.httpVersion())
.setHeaders(request.headers())
.content(request.content());
request.base().getQueryParams().forEach(pair ->
request.url().getQueryParams().forEach(pair ->
newHttpRequestBuilder.addParameter(pair.getFirst(), pair.getSecond())
);
request.cookies().forEach(newHttpRequestBuilder::addCookie);
@ -309,13 +305,13 @@ abstract class BaseTransport implements Transport {
private List<Cookie> matchCookiesFromBox(Request request) {
return cookieBox == null ? Collections.emptyList() : cookieBox.keySet().stream().filter(cookie ->
matchCookie(request.base(), cookie)
matchCookie(request.url(), cookie)
).collect(Collectors.toList());
}
private List<Cookie> matchCookies(Request request) {
return request.cookies().stream().filter(cookie ->
matchCookie(request.base(), cookie)
matchCookie(request.url(), cookie)
).collect(Collectors.toList());
}

View file

@ -69,13 +69,11 @@ public class HttpTransport extends BaseTransport {
if (retryRequest != null) {
// retry transport, wait for completion
client.retry(this, retryRequest);
retryRequest.close();
} else {
Request continueRequest = continuation(request, fullHttpResponse);
if (continueRequest != null) {
// continue with new transport, synchronous call here, wait for completion
client.continuation(this, continueRequest);
continueRequest.close();
}
}
} catch (URLSyntaxException | IOException e) {
@ -94,7 +92,7 @@ public class HttpTransport extends BaseTransport {
}
@Override
public void awaitResponse(Integer streamId) throws IOException {
public void awaitResponse(Integer streamId) throws IOException, TimeoutException {
if (streamId == null) {
return;
}
@ -103,9 +101,17 @@ public class HttpTransport extends BaseTransport {
}
CompletableFuture<Boolean> promise = sequentialPromiseMap.get(streamId);
if (promise != null) {
long millis = client.getClientConfig().getReadTimeoutMillis();
Request request = fromStreamId(streamId);
if (request != null && request.getTimeoutInMillis() > 0) {
millis = request.getTimeoutInMillis();
}
try {
promise.get(client.getClientConfig().getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
promise.get(millis, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
this.throwable = e;
throw new TimeoutException("timeout of " + millis + " milliseconds exceeded");
} catch (InterruptedException | ExecutionException e) {
this.throwable = e;
throw new IOException(e);
} finally {
@ -121,7 +127,7 @@ public class HttpTransport extends BaseTransport {
awaitResponse(streamId);
client.releaseChannel(channel);
}
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
logger.log(Level.WARNING, e.getMessage(), e);
} finally {
sequentialPromiseMap.clear();

View file

@ -12,6 +12,7 @@ import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
public interface Transport {
@ -38,7 +39,7 @@ public interface Transport {
void pushPromiseReceived(Integer streamId, Integer promisedStreamId, Http2Headers headers);
void awaitResponse(Integer streamId) throws IOException;
void awaitResponse(Integer streamId) throws IOException, TimeoutException;
Transport get();

View file

@ -14,19 +14,19 @@ import java.util.logging.Logger;
public class CompletableFutureTest {
private static final Logger logger = Logger.getLogger(ElasticsearchTest.class.getName());
private static final Logger logger = Logger.getLogger(CompletableFutureTest.class.getName());
/**
* Get some weird content from one URL and post it to another URL, by composing completable futures.
*/
@Test
public void testComposeCompletableFutures() throws IOException {
Client client = new Client();
Client client = Client.builder().build();
try {
final Function<FullHttpResponse, String> httpResponseStringFunction = response ->
response.content().toString(StandardCharsets.UTF_8);
Request request = Request.get()
.url("https://repo.maven.apache.org/maven2/org/xbib/netty-http-client/maven-metadata.xml.sha1")
.url("http://repo.maven.apache.org/maven2/org/xbib/netty-http-client/maven-metadata.xml.sha1")
.build();
CompletableFuture<String> completableFuture = client.execute(request, httpResponseStringFunction)
.exceptionally(Throwable::getMessage)

View file

@ -1,6 +1,7 @@
package org.xbib.netty.http.client.test;
import org.conscrypt.Conscrypt;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
@ -17,13 +18,14 @@ public class ConscryptTest extends LoggingBase {
@Test
public void testConscrypt() throws IOException {
Client client = Client.builder()
.enableDebug()
.setJdkSslProvider()
.setSslContextProvider(Conscrypt.newProvider())
.build();
logger.log(Level.INFO, client.getClientConfig().toString());
try {
Request request = Request.get()
.url("https://xbib.org")
.url("https://google.com")
.setVersion("HTTP/1.1")
.build()
.setResponseListener(fullHttpResponse -> {

View file

@ -3,13 +3,16 @@ package org.xbib.netty.http.client.test;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.transport.Transport;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -22,8 +25,27 @@ public class ElasticsearchTest extends LoggingBase {
private static final Logger logger = Logger.getLogger(ElasticsearchTest.class.getName());
@Test
@Ignore
public void testElasticsearch() throws IOException {
Client client = Client.builder().enableDebug().build();
try {
Request request = Request.get().url("http://localhost:9200")
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
});
logger.info("request = " + request.toString());
client.execute(request);
} finally {
client.shutdownGracefully();
}
}
@Test
@Ignore
public void testElasticsearchCreateDocument() throws IOException {
Client client = new Client();
Client client = Client.builder().enableDebug().build();
try {
Request request = Request.put().url("http://localhost:9200/test/test/1")
.json("{\"text\":\"Hello World\"}")
@ -32,6 +54,7 @@ public class ElasticsearchTest extends LoggingBase {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
});
logger.info("request = " + request.toString());
client.execute(request);
} finally {
client.shutdownGracefully();
@ -39,6 +62,7 @@ public class ElasticsearchTest extends LoggingBase {
}
@Test
@Ignore
public void testElasticsearchMatchQuery() throws IOException {
Client client = new Client();
try {
@ -55,24 +79,46 @@ public class ElasticsearchTest extends LoggingBase {
}
}
/**
* This shows the usage of 4 concurrent pooled connections on 4 threads, querying Elasticsearch.
* @throws IOException if test fails
*/
@Test
public void testElasticsearchConcurrent() throws IOException {
Client client = Client.builder().setReadTimeoutMillis(20000).build();
public void testElasticsearchPooled() throws IOException {
HttpAddress httpAddress = HttpAddress.http1("localhost", 9200);
int limit = 4;
Client client = Client.builder()
.addPoolNode(httpAddress)
.setPoolNodeConnectionLimit(limit)
.build();
int max = 1000;
int threads = 4;
try {
List<Request> queries = new ArrayList<>();
for (int i = 0; i < max; i++) {
queries.add(newRequest());
}
Transport transport = client.execute(queries.get(0)).get();
for (int i = 1; i < max; i++) {
transport.execute(queries.get(i)).get();
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int n = 0; n < threads; n++) {
executorService.submit(() -> {
List<Request> queries = new ArrayList<>();
for (int i = 0; i < max; i++) {
queries.add(newRequest());
}
try {
for (int i = 0; i < max; i++) {
client.pooledExecute(queries.get(i)).get();
}
} catch (IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.log(Level.WARNING, e.getMessage(), e);
} finally {
client.shutdownGracefully();
logger.log(Level.INFO, "count=" + count);
}
assertEquals(max, count.get());
logger.log(Level.INFO, "count=" + count);
assertEquals(max * threads, count.get());
}
private Request newRequest() {
@ -81,10 +127,12 @@ public class ElasticsearchTest extends LoggingBase {
.json("{\"query\":{\"match\":{\"text\":\"Hello World\"}}}")
.addHeader("connection", "keep-alive")
.build()
.setResponseListener(fullHttpResponse ->
logger.log(Level.FINE, "status = " + fullHttpResponse.status() +
" counter = " + count.getAndIncrement() +
" response body = " + fullHttpResponse.content().toString(StandardCharsets.UTF_8)));
.setResponseListener(fullHttpResponse -> {
count.getAndIncrement();
if (fullHttpResponse.status().code() != 200) {
logger.log(Level.WARNING,"error: " + fullHttpResponse.toString());
}
});
}
private final AtomicInteger count = new AtomicInteger();

View file

@ -1,15 +1,12 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -17,16 +14,9 @@ public class Http1Test extends LoggingBase {
private static final Logger logger = Logger.getLogger(Http1Test.class.getName());
@After
public void checkThreads() {
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
logger.log(Level.INFO, "threads = " + threadSet.size() );
threadSet.forEach( thread -> logger.log(Level.INFO, thread.toString()));
}
@Test
public void testHttp1() throws Exception {
Client client = new Client();
Client client = Client.builder().enableDebug().build();
try {
Request request = Request.get().url("http://xbib.org").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
@ -40,7 +30,6 @@ public class Http1Test extends LoggingBase {
}
@Test
@Ignore
public void testParallelRequests() throws IOException {
Client client = Client.builder().enableDebug().build();
try {
@ -70,7 +59,6 @@ public class Http1Test extends LoggingBase {
}
@Test
@Ignore
public void testSequentialRequests() throws Exception {
Client client = Client.builder().enableDebug().build();
try {
@ -79,7 +67,7 @@ public class Http1Test extends LoggingBase {
msg.content().toString(StandardCharsets.UTF_8)));
client.execute(request1).get();
Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0").build()
Request request2 = Request.get().url("http://google.com").setVersion("HTTP/1.1").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.content().toString(StandardCharsets.UTF_8)));
client.execute(request2).get();

View file

@ -25,7 +25,7 @@ public class Http2Test extends LoggingBase {
*
* demo/h2_demo_frame.html sends no content, only a push promise, and does not continue
*
* @throws IOException
* @throws IOException if test fails
*/
@Test
@Ignore

View file

@ -21,7 +21,7 @@ public class LeakTest {
}
@Test
public void testForLeaks() throws IOException, InterruptedException {
public void testForLeaks() throws IOException {
Client client = new Client();
client.shutdownGracefully();
}

View file

@ -1,6 +1,8 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpVersion;
import org.junit.Test;
import org.xbib.net.URL;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.HttpAddress;
import org.xbib.netty.http.client.Request;
@ -21,15 +23,15 @@ public class PooledClientTest extends LoggingBase {
@Test
public void testPooledClientWithSingleNode() throws IOException {
int loop = 10;
HttpAddress httpAddress = HttpAddress.http1("xbib.org", 80);
int threads = Runtime.getRuntime().availableProcessors();
URL url = URL.from("http://xbib.org");
HttpAddress httpAddress = HttpAddress.of(url, HttpVersion.HTTP_1_1);
Client client = Client.builder()
.addPoolNode(httpAddress)
.setPoolSecure(httpAddress.isSecure())
.setPoolNodeConnectionLimit(16)
.setPoolNodeConnectionLimit(threads)
.build();
AtomicInteger count = new AtomicInteger();
try {
int threads = 16;
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int n = 0; n < threads; n++) {
executorService.submit(() -> {
@ -37,25 +39,26 @@ public class PooledClientTest extends LoggingBase {
logger.log(Level.INFO, "starting " + Thread.currentThread());
for (int i = 0; i < loop; i++) {
Request request = Request.get()
.url("http://xbib.org/repository/")
.setVersion("HTTP/1.1")
.url(url.toString())
.setVersion(httpAddress.getVersion())
//.setTimeoutInMillis(25000L)
.build()
.setResponseListener(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
//logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
count.getAndIncrement();
});
client.pooledExecute(request).get();
count.getAndIncrement();
}
logger.log(Level.INFO, "done " + Thread.currentThread());
} catch (IOException e) {
} catch (Throwable e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
} catch (Throwable e) {
logger.log(Level.WARNING, e.getMessage(), e);
} finally {
client.shutdownGracefully();

View file

@ -13,7 +13,7 @@ public class RequestBuilderTest {
@Test
public void testSimpleRequest() {
Request request = Request.builder(HttpMethod.GET).build();
Request request = Request.builder(HttpMethod.GET).content("Hello", "text/plain").build();
logger.log(Level.INFO, request.toString());
}
}

View file

@ -0,0 +1,81 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.HttpMethod;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
public class SecureHttp1Test extends LoggingBase {
private static final Logger logger = Logger.getLogger(SecureHttp1Test.class.getName());
@Test
public void testHttp1() throws Exception {
Client client = Client.builder().enableDebug().build();
try {
Request request = Request.get().url("https://www.google.com/").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
client.execute(request).get();
} finally {
client.shutdown();
}
}
@Test
@Ignore
public void testParallelRequests() throws IOException {
Client client = Client.builder().enableDebug().build();
try {
Request request1 = Request.builder(HttpMethod.GET)
.url("https://google.com").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
Request request2 = Request.builder(HttpMethod.GET)
.url("https://google.com").setVersion("HTTP/1.1")
.build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.headers().entries() +
//msg.content().toString(StandardCharsets.UTF_8) +
" status=" + msg.status().code()));
for (int i = 0; i < 10; i++) {
client.execute(request1);
client.execute(request2);
}
} finally {
client.shutdownGracefully();
}
}
@Test
@Ignore
public void testSequentialRequests() throws Exception {
Client client = Client.builder().enableDebug().build();
try {
Request request1 = Request.get().url("https://google.com").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.content().toString(StandardCharsets.UTF_8)));
client.execute(request1).get();
Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0").build()
.setResponseListener(msg -> logger.log(Level.INFO, "got response: " +
msg.content().toString(StandardCharsets.UTF_8)));
client.execute(request2).get();
} finally {
client.shutdown();
}
}
}

View file

@ -40,14 +40,14 @@ public class EpollTest {
private static final Logger logger = Logger.getLogger(EpollTest.class.getName());
private static final int CONCURRENCY = 10;
private static final int CONCURRENCY = 4;
private static final List<HttpAddress> NODES =
Collections.singletonList(HttpAddress.http1("localhost", 12345));
private static final long TEST_TIME_SECONDS = 100;
private static final int ATTEMPTS = 10_000;
private static final int ATTEMPTS = 1_000;
private static final int FAIL_EVERY_ATTEMPT = 10;

View file

@ -37,14 +37,14 @@ public class NioTest {
private static final Logger logger = Logger.getLogger(NioTest.class.getName());
private static final int CONCURRENCY = 10;
private static final int CONCURRENCY = 4;
private static final List<HttpAddress> NODES =
Collections.singletonList(HttpAddress.http1("localhost", 12345));
private static final long TEST_TIME_SECONDS = 100;
private static final int ATTEMPTS = 10_000;
private static final int ATTEMPTS = 1_000;
private static final int FAIL_EVERY_ATTEMPT = 10;

View file

@ -44,7 +44,6 @@ public class Http2FramesTest {
private static final Logger logger = Logger.getLogger(Http2FramesTest.class.getName());
@Test
@Ignore
public void testHttp2Frames() throws Exception {
final InetSocketAddress inetSocketAddress = new InetSocketAddress("webtide.com", 443);
CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
@ -52,59 +51,59 @@ public class Http2FramesTest {
Channel clientChannel = null;
try {
Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
SslContext sslContext = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2))
.build();
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();
String fullQualifiedHostname = inetSocketAddress.getHostName();
SSLParameters params = engine.getSSLParameters();
params.setServerNames(Arrays.asList(new SNIServerName[]{new SNIHostName(fullQualifiedHostname)}));
engine.setSSLParameters(params);
ch.pipeline().addLast(sslHandler);
Http2FrameAdapter frameAdapter = new Http2FrameAdapter() {
@Override
protected void initChannel(Channel ch) throws Exception {
SslContext sslContext = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2))
.build();
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();
String fullQualifiedHostname = inetSocketAddress.getHostName();
SSLParameters params = engine.getSSLParameters();
params.setServerNames(Arrays.asList(new SNIServerName[]{new SNIHostName(fullQualifiedHostname)}));
engine.setSSLParameters(params);
ch.pipeline().addLast(sslHandler);
Http2FrameAdapter frameAdapter = new Http2FrameAdapter() {
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
logger.log(Level.FINE, "settings received, now writing request");
Http2ConnectionHandler handler = ctx.pipeline().get(Http2ConnectionHandler.class);
handler.encoder().writeHeaders(ctx, 3,
new DefaultHttp2Headers().method(HttpMethod.GET.asciiName())
.path("/")
.scheme("https")
.authority(inetSocketAddress.getHostName()),
0, true, ctx.newPromise());
ctx.channel().flush();
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
int i = super.onDataRead(ctx, streamId, data, padding, endOfStream);
if (endOfStream) {
completableFuture.complete(true);
}
return i;
}
};
Http2ConnectionHandlerBuilder builder = new Http2ConnectionHandlerBuilder()
.server(false)
.frameListener(frameAdapter)
.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client"));
ch.pipeline().addLast(builder.build());
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
logger.log(Level.FINE, "settings received, now writing request");
Http2ConnectionHandler handler = ctx.pipeline().get(Http2ConnectionHandler.class);
handler.encoder().writeHeaders(ctx, 3,
new DefaultHttp2Headers().method(HttpMethod.GET.asciiName())
.path("/")
.scheme("https")
.authority(inetSocketAddress.getHostName()),
0, true, ctx.newPromise());
ctx.channel().flush();
}
});
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
int i = super.onDataRead(ctx, streamId, data, padding, endOfStream);
if (endOfStream) {
completableFuture.complete(true);
}
return i;
}
};
Http2ConnectionHandlerBuilder builder = new Http2ConnectionHandlerBuilder()
.server(false)
.frameListener(frameAdapter)
.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client"));
ch.pipeline().addLast(builder.build());
}
});
logger.log(Level.INFO, () -> "connecting");
clientChannel = bootstrap.connect(inetSocketAddress).sync().channel();
logger.log(Level.INFO, () -> "waiting for end of stream");