From 8b50868c167b4564f6e7b15b866bf60f17c3efe7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Mon, 13 Jan 2020 14:16:44 +0100 Subject: [PATCH] fix for file descriptor leak --- gradle.properties | 2 +- netty-http-client/build.gradle | 5 +-- .../org/xbib/netty/http/client/Client.java | 45 +++++++++---------- .../handler/http/HttpResponseHandler.java | 3 +- .../handler/http2/Http2ResponseHandler.java | 3 +- .../http/client/transport/BaseTransport.java | 16 ++----- .../http/client/transport/Http1Transport.java | 1 + .../test/http1/FileDescriptorLeakTest.java | 38 ++++++++++++++++ netty-http-server/build.gradle | 11 +++-- .../org/xbib/netty/http/server/Server.java | 4 +- .../server/transport/HttpServerRequest.java | 10 ----- .../http/server/test/http1/CleartextTest.java | 30 ++++++++----- .../http/server/test/http2/CleartextTest.java | 21 ++++----- 13 files changed, 106 insertions(+), 83 deletions(-) create mode 100644 netty-http-client/src/test/java/org/xbib/netty/http/client/test/http1/FileDescriptorLeakTest.java diff --git a/gradle.properties b/gradle.properties index 9c97ef8..d3a3e79 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = netty-http -version = 4.1.44.1 +version = 4.1.44.2 # netty netty.version = 4.1.44.Final diff --git a/netty-http-client/build.gradle b/netty-http-client/build.gradle index ec0361b..f899ce4 100644 --- a/netty-http-client/build.gradle +++ b/netty-http-client/build.gradle @@ -4,11 +4,10 @@ dependencies { compile project(":netty-http-client-api") compile "io.netty:netty-handler-proxy:${project.property('netty.version')}" runtime "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}" - // we are an Mac OS X 10.12, but tcnative > 2.0.26 is for 10.13+ if (Os.isFamily(Os.FAMILY_MAC)) { runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}" - runtime project(':netty-http-kqueue') - } else { + //runtime project(':netty-http-kqueue') + } else if (Os.isFamily(Os.FAMILY_UNIX)) { runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" runtime project(':netty-http-epoll') } diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java index bdf0033..0498c58 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java @@ -22,6 +22,7 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.util.concurrent.Future; import org.xbib.netty.http.client.api.HttpChannelInitializer; import org.xbib.netty.http.client.api.ProtocolProvider; import org.xbib.netty.http.client.api.Request; @@ -120,43 +121,40 @@ public final class Client implements AutoCloseable { for (ProtocolProvider provider : ServiceLoader.load(ProtocolProvider.class)) { protocolProviders.add(provider); if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "protocol provider up: " + provider.transportClass()); + logger.log(Level.FINEST, "protocol provider: " + provider.transportClass()); } } initializeTrustManagerFactory(clientConfig); this.byteBufAllocator = byteBufAllocator != null ? byteBufAllocator : ByteBufAllocator.DEFAULT; if (eventLoopGroup != null) { this.eventLoopGroup = eventLoopGroup; - } else { - ServiceLoader transportProviders = ServiceLoader.load(TransportProvider.class); - for (TransportProvider transportProvider : transportProviders) { - if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) { - this.eventLoopGroup = transportProvider.createEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()); - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "transport provider event loop group: " + this.eventLoopGroup.getClass().getName()); - } - } + } + if (socketChannelClass != null) { + this.socketChannelClass = socketChannelClass; + } + ServiceLoader transportProviders = ServiceLoader.load(TransportProvider.class); + for (TransportProvider transportProvider : transportProviders) { + if (this.eventLoopGroup == null && + (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName()))) { + this.eventLoopGroup = transportProvider.createEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()); + } + if (this.socketChannelClass == null && + (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName()))) { + this.socketChannelClass = transportProvider.createSocketChannelClass(); } } if (this.eventLoopGroup == null) { this.eventLoopGroup = new NioEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()); } - if (socketChannelClass != null) { - this.socketChannelClass = socketChannelClass; - } else { - ServiceLoader transportProviders = ServiceLoader.load(TransportProvider.class); - for (TransportProvider transportProvider : transportProviders) { - if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) { - this.socketChannelClass = transportProvider.createSocketChannelClass(); - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "transport provider channel: " + this.socketChannelClass.getName()); - } - } - } + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "event loop group class: " + this.eventLoopGroup.getClass().getName()); } if (this.socketChannelClass == null) { this.socketChannelClass = NioSocketChannel.class; } + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "socket channel class: " + this.socketChannelClass.getName()); + } this.bootstrap = new Bootstrap() .group(this.eventLoopGroup) .channel(this.socketChannelClass) @@ -392,8 +390,9 @@ public final class Client implements AutoCloseable { if (hasPooledConnections()) { pool.close(); } - eventLoopGroup.shutdownGracefully(1L, amount, timeUnit); + Future future = eventLoopGroup.shutdownGracefully(1L, amount, timeUnit); eventLoopGroup.awaitTermination(amount, timeUnit); + future.sync(); } catch (Exception e) { throw new IOException(e); } diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/HttpResponseHandler.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/HttpResponseHandler.java index e6b96f3..f533d2c 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/HttpResponseHandler.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/HttpResponseHandler.java @@ -13,12 +13,13 @@ public class HttpResponseHandler extends SimpleChannelInboundHandler { + logger.log(Level.INFO, "status = " + resp.getStatus()); + }) + .build(); + client.execute(request); + } + if (os instanceof UnixOperatingSystemMXBean){ + logger.info("after: number of open file descriptor : " + ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount()); + } + } + } +} diff --git a/netty-http-server/build.gradle b/netty-http-server/build.gradle index 870f433..0b99e18 100644 --- a/netty-http-server/build.gradle +++ b/netty-http-server/build.gradle @@ -3,13 +3,12 @@ import org.apache.tools.ant.taskdefs.condition.Os dependencies { compile project(":netty-http-common") compile project(":netty-http-server-api") - // we are an Mac OS X 10.12, but tcnative > 2.0.26 is for 10.13+ if (Os.isFamily(Os.FAMILY_MAC)) { - runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}" - runtime project(':netty-http-kqueue') - } else { - runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" - runtime project(':netty-http-epoll') + testRuntime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}" + //runtime project(':netty-http-kqueue') + } else if (Os.isFamily(Os.FAMILY_UNIX)) { + testRuntime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" + testRuntime project(':netty-http-epoll') } testCompile project(":netty-http-client") testRuntime project(":netty-http-bouncycastle") diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java index 756daa4..0840093 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java @@ -63,9 +63,9 @@ public final class Server implements AutoCloseable { } } - private static final AtomicLong requestCounter = new AtomicLong(); + private static final AtomicLong requestCounter = new AtomicLong(0); - private static final AtomicLong responseCounter = new AtomicLong(); + private static final AtomicLong responseCounter = new AtomicLong(0); private final ServerConfig serverConfig; diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java index 889e465..b35c359 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java @@ -68,9 +68,6 @@ public class HttpServerRequest implements ServerRequest { } void handleParameters() { - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, () -> "request = " + httpRequest); - } Charset charset = HttpUtil.getCharset(httpRequest, StandardCharsets.UTF_8); this.url = URL.builder() .charset(charset, CodingErrorAction.REPLACE) @@ -79,13 +76,6 @@ public class HttpServerRequest implements ServerRequest { QueryParameters queryParameters = url.getQueryParams(); CharSequence mimeType = HttpUtil.getMimeType(httpRequest); ByteBuf byteBuf = httpRequest.content(); - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, () -> "url = " + url + - " charset = " + charset + - " mime type = " + mimeType + - " queryParameters = " + queryParameters + - " body exists = " + (byteBuf != null)); - } if (byteBuf != null) { if (httpRequest.method().equals(HttpMethod.POST)) { String params; diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java index d8b210f..8c23866 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java @@ -108,9 +108,9 @@ class CleartextTest { } @Test - void testMultithreadedPooledClearTextHttp1() throws Exception { - int threads = 8; - int loop = 1000; + void testMultithreadPooledClearTextHttp1() throws Exception { + int threads = 2; + int loop = 1024; HttpAddress httpAddress = HttpAddress.http1("localhost", 8008); Domain domain = Domain.builder(httpAddress) .singleEndpoint("/**", (request, response) -> @@ -123,10 +123,9 @@ class CleartextTest { .addPoolNode(httpAddress) .setPoolNodeConnectionLimit(threads) .build(); - AtomicInteger counter = new AtomicInteger(); + AtomicInteger counter = new AtomicInteger(0); final ResponseListener responseListener = resp -> { if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { - logger.log(Level.FINE, resp.getBodyAsString(StandardCharsets.UTF_8)); counter.incrementAndGet(); } }; @@ -138,34 +137,41 @@ class CleartextTest { try { for (int i = 0; i < loop; i++) { String payload = t + "/" + i; - Request request = Request.get().setVersion(HttpVersion.HTTP_1_1) + Request request = Request.get() + .setVersion(HttpVersion.HTTP_1_1) .url(server.getServerConfig().getAddress().base()) .content(payload, "text/plain") .setResponseListener(responseListener) .build(); - // note: a new transport is created per execution + // note: in HTTP 1, a new transport is created per execution Transport transport = client.newTransport(); transport.execute(request); if (transport.isFailed()) { logger.log(Level.WARNING, "transport failed: " + transport.getFailure().getMessage(), transport.getFailure()); break; } - transport.get(); + transport.get(20L, TimeUnit.SECONDS); } - } catch (Exception e) { + } catch (Throwable e) { logger.log(Level.SEVERE, e.getMessage(), e); } }); } executorService.shutdown(); - boolean terminated = executorService.awaitTermination(30, TimeUnit.SECONDS); + boolean terminated = executorService.awaitTermination(20L, TimeUnit.SECONDS); + executorService.shutdownNow(); logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete"); } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); } finally { - client.shutdownGracefully(); - server.shutdownGracefully(); + server.shutdownGracefully(20L, TimeUnit.SECONDS); + client.shutdownGracefully(20L, TimeUnit.SECONDS); } + logger.log(Level.INFO, "server requests = " + server.getRequestCounter() + + " server responses = " + server.getResponseCounter()); + logger.log(Level.INFO, "client requests = " + client.getRequestCounter() + + " client responses = " + client.getResponseCounter()); + logger.log(Level.INFO, "expected=" + (threads * loop) + " counter=" + counter.get()); assertEquals(threads * loop, counter.get()); } } diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java index 6fbb726..c905713 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java @@ -131,29 +131,24 @@ class CleartextTest { int loop = 1024; HttpAddress httpAddress = HttpAddress.http2("localhost", 8008); Domain domain = Domain.builder(httpAddress) - .singleEndpoint("/", (request, response) -> + .singleEndpoint("/**", (request, response) -> ServerResponse.write(response, HttpResponseStatus.OK, "text/plain", - request.getContent().retain())) - .build(); - Server server = Server.builder(domain) + request.getContent().toString(StandardCharsets.UTF_8))) .build(); + Server server = Server.builder(domain).build(); server.accept(); Client client = Client.builder() .addPoolNode(httpAddress) .setPoolNodeConnectionLimit(threads) .build(); - AtomicInteger counter = new AtomicInteger(); - // a HTTP/2 listener always receives responses out-of-order + AtomicInteger counter = new AtomicInteger(0); final ResponseListener responseListener = resp -> { - if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { + if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { counter.incrementAndGet(); - } else { - logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() + - " response body = " + resp.getBodyAsString(StandardCharsets.UTF_8)); } }; try { - // note: for HTTP/2 only, we use a single shared transport + // note: for HTTP/2 only, we can use a single shared transport final Transport transport = client.newTransport(); ExecutorService executorService = Executors.newFixedThreadPool(threads); for (int n = 0; n < threads; n++) { @@ -162,7 +157,8 @@ class CleartextTest { try { for (int i = 0; i < loop; i++) { String payload = t + "/" + i; - Request request = Request.get().setVersion("HTTP/2.0") + Request request = Request.get() + .setVersion("HTTP/2.0") .url(server.getServerConfig().getAddress().base()) .content(payload, "text/plain") .setResponseListener(responseListener) @@ -183,6 +179,7 @@ class CleartextTest { executorService.shutdownNow(); logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete"); transport.get(20L, TimeUnit.SECONDS); + logger.log(Level.INFO, "transport complete"); } finally { server.shutdownGracefully(20L, TimeUnit.SECONDS); client.shutdownGracefully(20L, TimeUnit.SECONDS);