long thread counters, ensure correct client close, more methods for Request API

This commit is contained in:
Jörg Prante 2019-10-09 17:01:50 +02:00
parent cb2b6a23df
commit 87395152ab
6 changed files with 73 additions and 72 deletions

View file

@ -1,6 +1,6 @@
group = org.xbib group = org.xbib
name = netty-http name = netty-http
version = 4.1.42.0 version = 4.1.42.1
# netty # netty
netty.version = 4.1.42.Final netty.version = 4.1.42.Final

View file

@ -31,8 +31,10 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -394,6 +396,11 @@ public final class Request {
return this; return this;
} }
public Builder setHeaders(Map<String, Object> headers) {
headers.forEach(this::addHeader);
return this;
}
public Builder setHeaders(HttpHeaders headers) { public Builder setHeaders(HttpHeaders headers) {
this.headers = headers; this.headers = headers;
return this; return this;
@ -430,10 +437,23 @@ public final class Request {
return this; return this;
} }
public Builder addParameter(String name, String value) { public Builder setParameters(Map<String, Object> parameters) {
parameters.forEach(this::addParameter);
return this;
}
@SuppressWarnings("unchecked")
public Builder addParameter(String name, Object value) {
Objects.requireNonNull(name); Objects.requireNonNull(name);
Objects.requireNonNull(value); Objects.requireNonNull(value);
uriParameters.addRaw(encode(contentType, name), encode(contentType, value)); Collection<Object> collection;
if (!(value instanceof Collection)) {
collection = Collections.singletonList(value);
} else {
collection = (Collection<Object>) value;
}
String k = encode(contentType, name);
collection.forEach(v -> uriParameters.addRaw(k, encode(contentType, v.toString())));
return this; return this;
} }

View file

@ -1,7 +1,6 @@
package org.xbib.netty.http.client.rest; package org.xbib.netty.http.client.rest;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import org.xbib.net.URL; import org.xbib.net.URL;
import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.Client;
@ -12,12 +11,9 @@ import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Objects;
public class RestClient { public class RestClient {
private static final Client client = new Client();
private HttpResponse response; private HttpResponse response;
private ByteBuf byteBuf; private ByteBuf byteBuf;
@ -25,7 +21,7 @@ public class RestClient {
private RestClient() { private RestClient() {
} }
public void setResponse(HttpResponse response) { private void setResponse(HttpResponse response) {
this.response = response; this.response = response;
this.byteBuf = response != null ? response.getBody().retain() : null; this.byteBuf = response != null ? response.getBody().retain() : null;
} }
@ -38,14 +34,10 @@ public class RestClient {
return asString(StandardCharsets.UTF_8); return asString(StandardCharsets.UTF_8);
} }
public String asString(Charset charset) { private String asString(Charset charset) {
return byteBuf != null && byteBuf.isReadable() ? byteBuf.toString(charset) : null; return byteBuf != null && byteBuf.isReadable() ? byteBuf.toString(charset) : null;
} }
public void close() throws IOException {
client.shutdownGracefully();
}
public static RestClient get(String urlString) throws IOException { public static RestClient get(String urlString) throws IOException {
return method(urlString, HttpMethod.GET); return method(urlString, HttpMethod.GET);
} }
@ -58,42 +50,28 @@ public class RestClient {
return method(urlString, body, StandardCharsets.UTF_8, HttpMethod.POST); return method(urlString, body, StandardCharsets.UTF_8, HttpMethod.POST);
} }
public static RestClient post(String urlString, ByteBuf content) throws IOException {
return method(urlString, content, HttpMethod.POST);
}
public static RestClient put(String urlString, String body) throws IOException { public static RestClient put(String urlString, String body) throws IOException {
return method(urlString, body, StandardCharsets.UTF_8, HttpMethod.PUT); return method(urlString, body, StandardCharsets.UTF_8, HttpMethod.PUT);
} }
public static RestClient put(String urlString, ByteBuf content) throws IOException { private static RestClient method(String urlString, HttpMethod httpMethod) throws IOException {
return method(urlString, content, HttpMethod.PUT); return method(urlString, null, null, httpMethod);
} }
public static RestClient method(String urlString, private static RestClient method(String urlString,
HttpMethod httpMethod) throws IOException { String body, Charset charset,
return method(urlString, Unpooled.buffer(), httpMethod); HttpMethod httpMethod) throws IOException {
}
public static RestClient method(String urlString,
String body, Charset charset,
HttpMethod httpMethod) throws IOException {
Objects.requireNonNull(body);
Objects.requireNonNull(charset);
ByteBuf byteBuf = client.getByteBufAllocator().buffer();
byteBuf.writeCharSequence(body, charset);
return method(urlString, byteBuf, httpMethod);
}
public static RestClient method(String urlString,
ByteBuf byteBuf,
HttpMethod httpMethod) throws IOException {
Objects.requireNonNull(byteBuf);
URL url = URL.create(urlString); URL url = URL.create(urlString);
RestClient restClient = new RestClient(); RestClient restClient = new RestClient();
Request.Builder requestBuilder = Request.builder(httpMethod).url(url); try (Client client = Client.builder()
requestBuilder.content(byteBuf); .setThreadCount(2) // for redirect
try { .build()) {
Request.Builder requestBuilder = Request.builder(httpMethod).url(url);
if (body != null) {
ByteBuf byteBuf = client.getByteBufAllocator().buffer();
byteBuf.writeCharSequence(body, charset);
requestBuilder.content(byteBuf);
}
client.newTransport(HttpAddress.http1(url)) client.newTransport(HttpAddress.http1(url))
.execute(requestBuilder.setResponseListener(restClient::setResponse).build()).close(); .execute(requestBuilder.setResponseListener(restClient::setResponse).build()).close();
} catch (Exception e) { } catch (Exception e) {

View file

@ -57,6 +57,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function; import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;
@ -66,8 +67,6 @@ public final class Client implements AutoCloseable {
private static final Logger logger = Logger.getLogger(Client.class.getName()); private static final Logger logger = Logger.getLogger(Client.class.getName());
private static final ThreadFactory httpClientThreadFactory = new HttpClientThreadFactory();
static { static {
if (System.getProperty("xbib.netty.http.client.extendsystemproperties") != null) { if (System.getProperty("xbib.netty.http.client.extendsystemproperties") != null) {
NetworkUtils.extendSystemProperties(); NetworkUtils.extendSystemProperties();
@ -80,9 +79,9 @@ public final class Client implements AutoCloseable {
System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true)); System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true));
} }
} }
private static final AtomicLong requestCounter = new AtomicLong(); private final AtomicLong requestCounter;
private static final AtomicLong responseCounter = new AtomicLong(); private final AtomicLong responseCounter;
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
@ -98,6 +97,8 @@ public final class Client implements AutoCloseable {
private final List<ProtocolProvider<HttpChannelInitializer, Transport>> protocolProviders; private final List<ProtocolProvider<HttpChannelInitializer, Transport>> protocolProviders;
private final AtomicBoolean closed;
private BoundedChannelPool<HttpAddress> pool; private BoundedChannelPool<HttpAddress> pool;
public Client() { public Client() {
@ -112,6 +113,9 @@ public final class Client implements AutoCloseable {
public Client(ClientConfig clientConfig, ByteBufAllocator byteBufAllocator, public Client(ClientConfig clientConfig, ByteBufAllocator byteBufAllocator,
EventLoopGroup eventLoopGroup, Class<? extends SocketChannel> socketChannelClass) { EventLoopGroup eventLoopGroup, Class<? extends SocketChannel> socketChannelClass) {
Objects.requireNonNull(clientConfig); Objects.requireNonNull(clientConfig);
this.requestCounter = new AtomicLong();
this.responseCounter = new AtomicLong();
this.closed = new AtomicBoolean(false);
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.protocolProviders = new ArrayList<>(); this.protocolProviders = new ArrayList<>();
for (ProtocolProvider<HttpChannelInitializer, Transport> provider : ServiceLoader.load(ProtocolProvider.class)) { for (ProtocolProvider<HttpChannelInitializer, Transport> provider : ServiceLoader.load(ProtocolProvider.class)) {
@ -124,8 +128,8 @@ public final class Client implements AutoCloseable {
this.byteBufAllocator = byteBufAllocator != null ? this.byteBufAllocator = byteBufAllocator != null ?
byteBufAllocator : ByteBufAllocator.DEFAULT; byteBufAllocator : ByteBufAllocator.DEFAULT;
this.eventLoopGroup = eventLoopGroup != null ? eventLoopGroup : clientConfig.isEpoll() ? this.eventLoopGroup = eventLoopGroup != null ? eventLoopGroup : clientConfig.isEpoll() ?
new EpollEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory) : new EpollEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()) :
new NioEventLoopGroup(clientConfig.getThreadCount(), httpClientThreadFactory); new NioEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory());
this.socketChannelClass = socketChannelClass != null ? socketChannelClass : clientConfig.isEpoll() ? this.socketChannelClass = socketChannelClass != null ? socketChannelClass : clientConfig.isEpoll() ?
EpollSocketChannel.class : NioSocketChannel.class; EpollSocketChannel.class : NioSocketChannel.class;
this.bootstrap = new Bootstrap() this.bootstrap = new Bootstrap()
@ -328,7 +332,7 @@ public final class Client implements AutoCloseable {
nextTransport.setCookieBox(transport.getCookieBox()); nextTransport.setCookieBox(transport.getCookieBox());
nextTransport.execute(request); nextTransport.execute(request);
nextTransport.get(); nextTransport.get();
close(nextTransport); closeAndRemove(nextTransport);
} }
/** /**
@ -341,16 +345,12 @@ public final class Client implements AutoCloseable {
public void retry(Transport transport, Request request) throws IOException { public void retry(Transport transport, Request request) throws IOException {
transport.execute(request); transport.execute(request);
transport.get(); transport.get();
close(transport); closeAndRemove(transport);
} }
@Override @Override
public void close() { public void close() throws IOException {
try { shutdownGracefully();
shutdownGracefully();
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
} }
public void shutdownGracefully() throws IOException { public void shutdownGracefully() throws IOException {
@ -358,22 +358,24 @@ public final class Client implements AutoCloseable {
} }
public void shutdownGracefully(long amount, TimeUnit timeUnit) throws IOException { public void shutdownGracefully(long amount, TimeUnit timeUnit) throws IOException {
logger.log(Level.FINE, "shutting down"); if (closed.compareAndSet(false, true)) {
for (Transport transport : transports) { try {
close(transport); for (Transport transport : transports) {
} transport.close();
if (hasPooledConnections()) { }
pool.close(); transports.clear();
} if (hasPooledConnections()) {
eventLoopGroup.shutdownGracefully(1L, amount, timeUnit); pool.close();
try { }
eventLoopGroup.awaitTermination(amount, timeUnit); eventLoopGroup.shutdownGracefully(1L, amount, timeUnit);
} catch (InterruptedException e) { eventLoopGroup.awaitTermination(amount, timeUnit);
throw new IOException(e); } catch (Exception e) {
throw new IOException(e);
}
} }
} }
private void close(Transport transport) throws IOException { private void closeAndRemove(Transport transport) throws IOException {
try { try {
transport.close(); transport.close();
} catch (Exception e) { } catch (Exception e) {
@ -446,7 +448,7 @@ public final class Client implements AutoCloseable {
static class HttpClientThreadFactory implements ThreadFactory { static class HttpClientThreadFactory implements ThreadFactory {
private int number = 0; private long number = 0;
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {

View file

@ -318,6 +318,7 @@ public abstract class BaseTransport implements Transport {
protected Request retry(Request request, HttpResponse httpResponse) { protected Request retry(Request request, HttpResponse httpResponse) {
if (httpResponse == null) { if (httpResponse == null) {
// no response present, invalid in any way
return null; return null;
} }
if (request == null) { if (request == null) {

View file

@ -361,7 +361,7 @@ public final class Server implements AutoCloseable {
static class HttpServerParentThreadFactory implements ThreadFactory { static class HttpServerParentThreadFactory implements ThreadFactory {
private int number = 0; private long number = 0;
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
@ -373,7 +373,7 @@ public final class Server implements AutoCloseable {
static class HttpServerChildThreadFactory implements ThreadFactory { static class HttpServerChildThreadFactory implements ThreadFactory {
private int number = 0; private long number = 0;
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
@ -385,7 +385,7 @@ public final class Server implements AutoCloseable {
static class BlockingThreadFactory implements ThreadFactory { static class BlockingThreadFactory implements ThreadFactory {
private int number = 0; private long number = 0;
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {