fix for file descriptor leak

This commit is contained in:
Jörg Prante 2020-01-13 14:16:44 +01:00
parent b980dc9b9c
commit 8b50868c16
13 changed files with 106 additions and 83 deletions

View file

@ -1,6 +1,6 @@
group = org.xbib
name = netty-http
version = 4.1.44.1
version = 4.1.44.2
# netty
netty.version = 4.1.44.Final

View file

@ -4,11 +4,10 @@ dependencies {
compile project(":netty-http-client-api")
compile "io.netty:netty-handler-proxy:${project.property('netty.version')}"
runtime "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
// we are an Mac OS X 10.12, but tcnative > 2.0.26 is for 10.13+
if (Os.isFamily(Os.FAMILY_MAC)) {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}"
runtime project(':netty-http-kqueue')
} else {
//runtime project(':netty-http-kqueue')
} else if (Os.isFamily(Os.FAMILY_UNIX)) {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
runtime project(':netty-http-epoll')
}

View file

@ -22,6 +22,7 @@ import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import org.xbib.netty.http.client.api.HttpChannelInitializer;
import org.xbib.netty.http.client.api.ProtocolProvider;
import org.xbib.netty.http.client.api.Request;
@ -120,43 +121,40 @@ public final class Client implements AutoCloseable {
for (ProtocolProvider<HttpChannelInitializer, Transport> provider : ServiceLoader.load(ProtocolProvider.class)) {
protocolProviders.add(provider);
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "protocol provider up: " + provider.transportClass());
logger.log(Level.FINEST, "protocol provider: " + provider.transportClass());
}
}
initializeTrustManagerFactory(clientConfig);
this.byteBufAllocator = byteBufAllocator != null ? byteBufAllocator : ByteBufAllocator.DEFAULT;
if (eventLoopGroup != null) {
this.eventLoopGroup = eventLoopGroup;
} else {
ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
for (TransportProvider transportProvider : transportProviders) {
if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
this.eventLoopGroup = transportProvider.createEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory());
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "transport provider event loop group: " + this.eventLoopGroup.getClass().getName());
}
}
}
if (socketChannelClass != null) {
this.socketChannelClass = socketChannelClass;
}
ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
for (TransportProvider transportProvider : transportProviders) {
if (this.eventLoopGroup == null &&
(clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName()))) {
this.eventLoopGroup = transportProvider.createEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory());
}
if (this.socketChannelClass == null &&
(clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName()))) {
this.socketChannelClass = transportProvider.createSocketChannelClass();
}
}
if (this.eventLoopGroup == null) {
this.eventLoopGroup = new NioEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory());
}
if (socketChannelClass != null) {
this.socketChannelClass = socketChannelClass;
} else {
ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
for (TransportProvider transportProvider : transportProviders) {
if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
this.socketChannelClass = transportProvider.createSocketChannelClass();
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "transport provider channel: " + this.socketChannelClass.getName());
}
}
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "event loop group class: " + this.eventLoopGroup.getClass().getName());
}
if (this.socketChannelClass == null) {
this.socketChannelClass = NioSocketChannel.class;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "socket channel class: " + this.socketChannelClass.getName());
}
this.bootstrap = new Bootstrap()
.group(this.eventLoopGroup)
.channel(this.socketChannelClass)
@ -392,8 +390,9 @@ public final class Client implements AutoCloseable {
if (hasPooledConnections()) {
pool.close();
}
eventLoopGroup.shutdownGracefully(1L, amount, timeUnit);
Future<?> future = eventLoopGroup.shutdownGracefully(1L, amount, timeUnit);
eventLoopGroup.awaitTermination(amount, timeUnit);
future.sync();
} catch (Exception e) {
throw new IOException(e);
}

View file

@ -13,12 +13,13 @@ public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpRes
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception {
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
transport.responseReceived(ctx.channel(), null, fullHttpResponse);
// do not close ctx here
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
transport.fail(cause);
ctx.channel().close();
// do not close ctx here
}
}

View file

@ -15,12 +15,13 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
Integer streamId = httpResponse.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
transport.responseReceived(ctx.channel(), streamId, httpResponse);
// do not close ctx here
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
transport.fail(cause);
ctx.channel().close();
// do not close ctx here
}
}

View file

@ -102,15 +102,11 @@ public abstract class BaseTransport implements Transport {
@Override
public void close() {
// channels are present, maybe forgot a get() to receive responses?
if (!channels.isEmpty()) {
get();
}
for (Flow flow : flowMap.values()) {
flow.close();
}
channels.clear();
httpDataFactory.cleanAllHttpData();
// do not clear requests
cancel();
}
@Override
@ -160,12 +156,7 @@ public abstract class BaseTransport implements Transport {
flow.get(key).get(value, timeUnit);
completeRequest(requestKey);
} catch (Exception e) {
if (requestKey != null) {
Request request = requests.get(requestKey);
if (request != null && request.getCompletableFuture() != null) {
request.getCompletableFuture().completeExceptionally(e);
}
}
completeRequestExceptionally(requestKey, e);
flow.fail(e);
} finally {
flow.remove(key);
@ -217,6 +208,7 @@ public abstract class BaseTransport implements Transport {
flowMap.clear();
channels.clear();
requests.clear();
httpDataFactory.cleanAllHttpData();
}
@Override

View file

@ -1,6 +1,7 @@
package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;

View file

@ -0,0 +1,38 @@
package org.xbib.netty.http.client.test.http1;
import com.sun.management.UnixOperatingSystemMXBean;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.api.Request;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.logging.Level;
import java.util.logging.Logger;
@Disabled
class FileDescriptorLeakTest {
private static final Logger logger = Logger.getLogger(FileDescriptorLeakTest.class.getName());
@Test
void testFileLeak() throws Exception {
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
for (int i = 0; i< 10; i++) {
if (os instanceof UnixOperatingSystemMXBean) {
logger.info("before: number of open file descriptor : " + ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount());
}
try (Client client = Client.builder().setThreadCount(1).build()) {
Request request = Request.get().url("http://xbib.org")
.setResponseListener(resp -> {
logger.log(Level.INFO, "status = " + resp.getStatus());
})
.build();
client.execute(request);
}
if (os instanceof UnixOperatingSystemMXBean){
logger.info("after: number of open file descriptor : " + ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount());
}
}
}
}

View file

@ -3,13 +3,12 @@ import org.apache.tools.ant.taskdefs.condition.Os
dependencies {
compile project(":netty-http-common")
compile project(":netty-http-server-api")
// we are an Mac OS X 10.12, but tcnative > 2.0.26 is for 10.13+
if (Os.isFamily(Os.FAMILY_MAC)) {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}"
runtime project(':netty-http-kqueue')
} else {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
runtime project(':netty-http-epoll')
testRuntime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}"
//runtime project(':netty-http-kqueue')
} else if (Os.isFamily(Os.FAMILY_UNIX)) {
testRuntime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
testRuntime project(':netty-http-epoll')
}
testCompile project(":netty-http-client")
testRuntime project(":netty-http-bouncycastle")

View file

@ -63,9 +63,9 @@ public final class Server implements AutoCloseable {
}
}
private static final AtomicLong requestCounter = new AtomicLong();
private static final AtomicLong requestCounter = new AtomicLong(0);
private static final AtomicLong responseCounter = new AtomicLong();
private static final AtomicLong responseCounter = new AtomicLong(0);
private final ServerConfig serverConfig;

View file

@ -68,9 +68,6 @@ public class HttpServerRequest implements ServerRequest {
}
void handleParameters() {
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, () -> "request = " + httpRequest);
}
Charset charset = HttpUtil.getCharset(httpRequest, StandardCharsets.UTF_8);
this.url = URL.builder()
.charset(charset, CodingErrorAction.REPLACE)
@ -79,13 +76,6 @@ public class HttpServerRequest implements ServerRequest {
QueryParameters queryParameters = url.getQueryParams();
CharSequence mimeType = HttpUtil.getMimeType(httpRequest);
ByteBuf byteBuf = httpRequest.content();
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, () -> "url = " + url +
" charset = " + charset +
" mime type = " + mimeType +
" queryParameters = " + queryParameters +
" body exists = " + (byteBuf != null));
}
if (byteBuf != null) {
if (httpRequest.method().equals(HttpMethod.POST)) {
String params;

View file

@ -108,9 +108,9 @@ class CleartextTest {
}
@Test
void testMultithreadedPooledClearTextHttp1() throws Exception {
int threads = 8;
int loop = 1000;
void testMultithreadPooledClearTextHttp1() throws Exception {
int threads = 2;
int loop = 1024;
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/**", (request, response) ->
@ -123,10 +123,9 @@ class CleartextTest {
.addPoolNode(httpAddress)
.setPoolNodeConnectionLimit(threads)
.build();
AtomicInteger counter = new AtomicInteger();
AtomicInteger counter = new AtomicInteger(0);
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
logger.log(Level.FINE, resp.getBodyAsString(StandardCharsets.UTF_8));
counter.incrementAndGet();
}
};
@ -138,34 +137,41 @@ class CleartextTest {
try {
for (int i = 0; i < loop; i++) {
String payload = t + "/" + i;
Request request = Request.get().setVersion(HttpVersion.HTTP_1_1)
Request request = Request.get()
.setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base())
.content(payload, "text/plain")
.setResponseListener(responseListener)
.build();
// note: a new transport is created per execution
// note: in HTTP 1, a new transport is created per execution
Transport transport = client.newTransport();
transport.execute(request);
if (transport.isFailed()) {
logger.log(Level.WARNING, "transport failed: " + transport.getFailure().getMessage(), transport.getFailure());
break;
}
transport.get();
transport.get(20L, TimeUnit.SECONDS);
}
} catch (Exception e) {
} catch (Throwable e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
});
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(30, TimeUnit.SECONDS);
boolean terminated = executorService.awaitTermination(20L, TimeUnit.SECONDS);
executorService.shutdownNow();
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} finally {
client.shutdownGracefully();
server.shutdownGracefully();
server.shutdownGracefully(20L, TimeUnit.SECONDS);
client.shutdownGracefully(20L, TimeUnit.SECONDS);
}
logger.log(Level.INFO, "server requests = " + server.getRequestCounter() +
" server responses = " + server.getResponseCounter());
logger.log(Level.INFO, "client requests = " + client.getRequestCounter() +
" client responses = " + client.getResponseCounter());
logger.log(Level.INFO, "expected=" + (threads * loop) + " counter=" + counter.get());
assertEquals(threads * loop, counter.get());
}
}

View file

@ -131,29 +131,24 @@ class CleartextTest {
int loop = 1024;
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/", (request, response) ->
.singleEndpoint("/**", (request, response) ->
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getContent().retain()))
.build();
Server server = Server.builder(domain)
request.getContent().toString(StandardCharsets.UTF_8)))
.build();
Server server = Server.builder(domain).build();
server.accept();
Client client = Client.builder()
.addPoolNode(httpAddress)
.setPoolNodeConnectionLimit(threads)
.build();
AtomicInteger counter = new AtomicInteger();
// a HTTP/2 listener always receives responses out-of-order
AtomicInteger counter = new AtomicInteger(0);
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
counter.incrementAndGet();
} else {
logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() +
" response body = " + resp.getBodyAsString(StandardCharsets.UTF_8));
}
};
try {
// note: for HTTP/2 only, we use a single shared transport
// note: for HTTP/2 only, we can use a single shared transport
final Transport transport = client.newTransport();
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int n = 0; n < threads; n++) {
@ -162,7 +157,8 @@ class CleartextTest {
try {
for (int i = 0; i < loop; i++) {
String payload = t + "/" + i;
Request request = Request.get().setVersion("HTTP/2.0")
Request request = Request.get()
.setVersion("HTTP/2.0")
.url(server.getServerConfig().getAddress().base())
.content(payload, "text/plain")
.setResponseListener(responseListener)
@ -183,6 +179,7 @@ class CleartextTest {
executorService.shutdownNow();
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");
transport.get(20L, TimeUnit.SECONDS);
logger.log(Level.INFO, "transport complete");
} finally {
server.shutdownGracefully(20L, TimeUnit.SECONDS);
client.shutdownGracefully(20L, TimeUnit.SECONDS);