From 87395152ab4d8c0b9e242a3a25e202019bca366e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Wed, 9 Oct 2019 17:01:50 +0200 Subject: [PATCH] long thread counters, ensure correct client close, more methods for Request API --- gradle.properties | 2 +- .../xbib/netty/http/client/api/Request.java | 24 +++++++- .../netty/http/client/rest/RestClient.java | 54 +++++------------ .../org/xbib/netty/http/client/Client.java | 58 ++++++++++--------- .../http/client/transport/BaseTransport.java | 1 + .../org/xbib/netty/http/server/Server.java | 6 +- 6 files changed, 73 insertions(+), 72 deletions(-) diff --git a/gradle.properties b/gradle.properties index 6623594..1e7bef4 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = netty-http -version = 4.1.42.0 +version = 4.1.42.1 # netty netty.version = 4.1.42.Final diff --git a/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java b/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java index dc0741d..aa760ca 100644 --- a/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java +++ b/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java @@ -31,8 +31,10 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -394,6 +396,11 @@ public final class Request { return this; } + public Builder setHeaders(Map headers) { + headers.forEach(this::addHeader); + return this; + } + public Builder setHeaders(HttpHeaders headers) { this.headers = headers; return this; @@ -430,10 +437,23 @@ public final class Request { return this; } - public Builder addParameter(String name, String value) { + public Builder setParameters(Map parameters) { + parameters.forEach(this::addParameter); + return this; + } + + @SuppressWarnings("unchecked") + public Builder addParameter(String name, Object value) { Objects.requireNonNull(name); Objects.requireNonNull(value); - uriParameters.addRaw(encode(contentType, name), encode(contentType, value)); + Collection collection; + if (!(value instanceof Collection)) { + collection = Collections.singletonList(value); + } else { + collection = (Collection) value; + } + String k = encode(contentType, name); + collection.forEach(v -> uriParameters.addRaw(k, encode(contentType, v.toString()))); return this; } diff --git a/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java b/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java index 656b5f5..abd0c7f 100644 --- a/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java +++ b/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java @@ -1,7 +1,6 @@ package org.xbib.netty.http.client.rest; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpMethod; import org.xbib.net.URL; import org.xbib.netty.http.client.Client; @@ -12,12 +11,9 @@ import org.xbib.netty.http.common.HttpResponse; import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.Objects; public class RestClient { - private static final Client client = new Client(); - private HttpResponse response; private ByteBuf byteBuf; @@ -25,7 +21,7 @@ public class RestClient { private RestClient() { } - public void setResponse(HttpResponse response) { + private void setResponse(HttpResponse response) { this.response = response; this.byteBuf = response != null ? response.getBody().retain() : null; } @@ -38,14 +34,10 @@ public class RestClient { return asString(StandardCharsets.UTF_8); } - public String asString(Charset charset) { + private String asString(Charset charset) { return byteBuf != null && byteBuf.isReadable() ? byteBuf.toString(charset) : null; } - public void close() throws IOException { - client.shutdownGracefully(); - } - public static RestClient get(String urlString) throws IOException { return method(urlString, HttpMethod.GET); } @@ -58,42 +50,28 @@ public class RestClient { return method(urlString, body, StandardCharsets.UTF_8, HttpMethod.POST); } - public static RestClient post(String urlString, ByteBuf content) throws IOException { - return method(urlString, content, HttpMethod.POST); - } - public static RestClient put(String urlString, String body) throws IOException { return method(urlString, body, StandardCharsets.UTF_8, HttpMethod.PUT); } - public static RestClient put(String urlString, ByteBuf content) throws IOException { - return method(urlString, content, HttpMethod.PUT); + private static RestClient method(String urlString, HttpMethod httpMethod) throws IOException { + return method(urlString, null, null, httpMethod); } - public static RestClient method(String urlString, - HttpMethod httpMethod) throws IOException { - return method(urlString, Unpooled.buffer(), httpMethod); - } - - public static RestClient method(String urlString, - String body, Charset charset, - HttpMethod httpMethod) throws IOException { - Objects.requireNonNull(body); - Objects.requireNonNull(charset); - ByteBuf byteBuf = client.getByteBufAllocator().buffer(); - byteBuf.writeCharSequence(body, charset); - return method(urlString, byteBuf, httpMethod); - } - - public static RestClient method(String urlString, - ByteBuf byteBuf, - HttpMethod httpMethod) throws IOException { - Objects.requireNonNull(byteBuf); + private static RestClient method(String urlString, + String body, Charset charset, + HttpMethod httpMethod) throws IOException { URL url = URL.create(urlString); RestClient restClient = new RestClient(); - Request.Builder requestBuilder = Request.builder(httpMethod).url(url); - requestBuilder.content(byteBuf); - try { + try (Client client = Client.builder() + .setThreadCount(2) // for redirect + .build()) { + Request.Builder requestBuilder = Request.builder(httpMethod).url(url); + if (body != null) { + ByteBuf byteBuf = client.getByteBufAllocator().buffer(); + byteBuf.writeCharSequence(body, charset); + requestBuilder.content(byteBuf); + } client.newTransport(HttpAddress.http1(url)) .execute(requestBuilder.setResponseListener(restClient::setResponse).build()).close(); } catch (Exception e) { diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java index ec75f3a..6662031 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java @@ -57,6 +57,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.logging.Level; @@ -66,8 +67,6 @@ public final class Client implements AutoCloseable { private static final Logger logger = Logger.getLogger(Client.class.getName()); - private static final ThreadFactory httpClientThreadFactory = new HttpClientThreadFactory(); - static { if (System.getProperty("xbib.netty.http.client.extendsystemproperties") != null) { NetworkUtils.extendSystemProperties(); @@ -80,9 +79,9 @@ public final class Client implements AutoCloseable { System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true)); } } - private static final AtomicLong requestCounter = new AtomicLong(); + private final AtomicLong requestCounter; - private static final AtomicLong responseCounter = new AtomicLong(); + private final AtomicLong responseCounter; private final ClientConfig clientConfig; @@ -98,6 +97,8 @@ public final class Client implements AutoCloseable { private final List> protocolProviders; + private final AtomicBoolean closed; + private BoundedChannelPool pool; public Client() { @@ -112,6 +113,9 @@ public final class Client implements AutoCloseable { public Client(ClientConfig clientConfig, ByteBufAllocator byteBufAllocator, EventLoopGroup eventLoopGroup, Class socketChannelClass) { Objects.requireNonNull(clientConfig); + this.requestCounter = new AtomicLong(); + this.responseCounter = new AtomicLong(); + this.closed = new AtomicBoolean(false); this.clientConfig = clientConfig; this.protocolProviders = new ArrayList<>(); for (ProtocolProvider provider : ServiceLoader.load(ProtocolProvider.class)) { @@ -124,8 +128,8 @@ public final class Client implements AutoCloseable { this.byteBufAllocator = byteBufAllocator != null ? byteBufAllocator : ByteBufAllocator.DEFAULT; this.eventLoopGroup = eventLoopGroup != null ? eventLoopGroup : clientConfig.isEpoll() ? - new EpollEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory) : - new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory); + new EpollEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()) : + new NioEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()); this.socketChannelClass = socketChannelClass != null ? socketChannelClass : clientConfig.isEpoll() ? EpollSocketChannel.class : NioSocketChannel.class; this.bootstrap = new Bootstrap() @@ -328,7 +332,7 @@ public final class Client implements AutoCloseable { nextTransport.setCookieBox(transport.getCookieBox()); nextTransport.execute(request); nextTransport.get(); - close(nextTransport); + closeAndRemove(nextTransport); } /** @@ -341,16 +345,12 @@ public final class Client implements AutoCloseable { public void retry(Transport transport, Request request) throws IOException { transport.execute(request); transport.get(); - close(transport); + closeAndRemove(transport); } @Override - public void close() { - try { - shutdownGracefully(); - } catch (IOException e) { - logger.log(Level.SEVERE, e.getMessage(), e); - } + public void close() throws IOException { + shutdownGracefully(); } public void shutdownGracefully() throws IOException { @@ -358,22 +358,24 @@ public final class Client implements AutoCloseable { } public void shutdownGracefully(long amount, TimeUnit timeUnit) throws IOException { - logger.log(Level.FINE, "shutting down"); - for (Transport transport : transports) { - close(transport); - } - if (hasPooledConnections()) { - pool.close(); - } - eventLoopGroup.shutdownGracefully(1L, amount, timeUnit); - try { - eventLoopGroup.awaitTermination(amount, timeUnit); - } catch (InterruptedException e) { - throw new IOException(e); + if (closed.compareAndSet(false, true)) { + try { + for (Transport transport : transports) { + transport.close(); + } + transports.clear(); + if (hasPooledConnections()) { + pool.close(); + } + eventLoopGroup.shutdownGracefully(1L, amount, timeUnit); + eventLoopGroup.awaitTermination(amount, timeUnit); + } catch (Exception e) { + throw new IOException(e); + } } } - private void close(Transport transport) throws IOException { + private void closeAndRemove(Transport transport) throws IOException { try { transport.close(); } catch (Exception e) { @@ -446,7 +448,7 @@ public final class Client implements AutoCloseable { static class HttpClientThreadFactory implements ThreadFactory { - private int number = 0; + private long number = 0; @Override public Thread newThread(Runnable runnable) { diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java index 52a1adf..a6fe8a0 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java @@ -318,6 +318,7 @@ public abstract class BaseTransport implements Transport { protected Request retry(Request request, HttpResponse httpResponse) { if (httpResponse == null) { + // no response present, invalid in any way return null; } if (request == null) { diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java index afc187a..6155c65 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java @@ -361,7 +361,7 @@ public final class Server implements AutoCloseable { static class HttpServerParentThreadFactory implements ThreadFactory { - private int number = 0; + private long number = 0; @Override public Thread newThread(Runnable runnable) { @@ -373,7 +373,7 @@ public final class Server implements AutoCloseable { static class HttpServerChildThreadFactory implements ThreadFactory { - private int number = 0; + private long number = 0; @Override public Thread newThread(Runnable runnable) { @@ -385,7 +385,7 @@ public final class Server implements AutoCloseable { static class BlockingThreadFactory implements ThreadFactory { - private int number = 0; + private long number = 0; @Override public Thread newThread(Runnable runnable) {